On 2019-09-03 05:32, Euler Taveira wrote:
Em ter, 3 de set de 2019 às 00:16, Alexey Zagarin <zaga...@gmail.com> escreveu:

There are complaints in the log (both pub and sub) like:
ERROR: trying to store a heap tuple into wrong type of slot

I have no idea what causes that.

Yeah, I've seen that too. It was fixed by Alexey Kondratov, in line 955 of 0005-Row-filtering-for-logical-replication.patch it should be &TTSOpsHeapTuple instead of &TTSOpsVirtual.

Ops... exact. That was an oversight while poking with different types of slots.

OK, I'll consider Alexey Kondratov's set of patches as the current state-of-the-art then. (They still apply.)

I found a problem where I'm not sure it's a bug:

The attached bash script does a test by setting up pgbench tables on both master and replica, and then sets up logical replication for a slice of pgbench_accounts. Then it does a short pgbench run, and loops until the results become identical(ok) (or breaks out after a certain time (NOK=not ok)).

It turns out this did not work until I added a wait state after the CREATE SUBSCRIPTION. It always fails without the wait state, and always works with the wait state.

Do you agree this is a bug?


thanks (also to both Alexeys :))


Erik Rijkers


PS
by the way, this script won't run as-is on other machines; it has stuff particular to my local setup.


#!/bin/bash

# postgres binary compiled with 
#
# pgpatches/0130/logrep_rowfilter/20190902/v2-0001-Remove-unused-atttypmod-column-from-initial-table.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0002-Store-number-of-tuples-in-WalRcvExecResult.patch       
# pgpatches/0130/logrep_rowfilter/20190902/v2-0003-Refactor-function-create_estate_for_relation.patch     
# pgpatches/0130/logrep_rowfilter/20190902/v2-0004-Rename-a-WHERE-node.patch                              
# pgpatches/0130/logrep_rowfilter/20190902/v2-0005-Row-filtering-for-logical-replication.patch            
# pgpatches/0130/logrep_rowfilter/20190902/v2-0006-Print-publication-WHERE-condition-in-psql.patch        
# pgpatches/0130/logrep_rowfilter/20190902/v2-0007-Publication-where-condition-support-for-pg_dump.patch  
# pgpatches/0130/logrep_rowfilter/20190902/v2-0008-Debug-for-row-filtering.patch                          
# pgpatches/0130/logrep_rowfilter/20190902/v2-0009-Add-simple-BDR-test-for-row-filtering.patch
 

unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres

root_dir=/tmp/cascade/logrep_rowfilter

mkdir -p $root_dir

BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin

export PATH=$BIN:$PATH

  initdb=$BIN/initdb
postgres=$BIN/postgres
  pg_ctl=$BIN/pg_ctl
baseport=6525
   port1=$(( $baseport + 0 )) 
   port2=$(( $baseport + 1 ))
 appname=rowfilter

num_instances=2
        scale=1      where="where (aid between 40000 and 50000-1)"
     #  scale=10     where="where (aid between 400000 and 400000+50000-1)"
      clients=64
     duration=20
         wait=10  BASTA_COUNT=40  #   7200   #  wait seconds in total  

if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi

devel_file=/tmp/bugs
echo filterbug>$devel_file

for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( $baseport + $n -1 ))
  logfile=$server_dir/logfile.$port
  echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
           $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file  &> /dev/null 
  ( $postgres  -D $data_dir -p $port \
    --wal_level=logical --logging_collector=on \
    --client_min_messages=warning \
    --log_directory=$server_dir --log_filename=logfile.${port} \
    --log_replication_commands=on & ) &> /dev/null
done 

echo "sleep 3s"
sleep 3

echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port1 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp $port2 \
&& pgbench -p $port1 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
  | psql -q1Xp $port1 && pg_dump -F c -p $port1 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
   -t pgbench_history -t pgbench_accounts \
   -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p $port2 -d postgres


pub1=pub_${port1}_to_${port2}
sub1=sub_${port2}_fr_${port1}
echo -ne "
create publication $pub1;
alter  publication $pub1 add table pgbench_accounts $where ; --> where
alter  publication $pub1 add table pgbench_branches;
alter  publication $pub1 add table pgbench_tellers;
alter  publication $pub1 add table pgbench_history;
" | psql -p $port1 -aqtAX

###  sleep 10   # no need for a wait

echo "
create subscription $sub1 connection 'port=$port1 application_name=$appname'
       publication  $pub1 with(enabled=false);
alter  subscription $sub1 enable;" | psql -p $port2 -aqtAX

sleep 1   #  WAIT NECESSARY

echo "-- pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres    #  scale $scale"
         pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres    #  scale $scale
echo

echo "       accounts  branches   tellers   history"
echo "       --------- --------- --------- ---------"

 sql_a="select * from pgbench_accounts $where order by aid;"
 sql_b="select * from pgbench_branches        order by bid;"
 sql_t="select * from pgbench_tellers         order by tid;"
 sql_h="select * from pgbench_history         order by hid;"
sqlc_a="select count(*) from pgbench_accounts $where;"
sqlc_b="select count(*) from pgbench_branches       ;"
sqlc_t="select count(*) from pgbench_tellers        ;"
sqlc_h="select count(*) from pgbench_history        ;"

count=0

while [[ 1 -eq 1 ]]
do

  secs=$(( $count * $wait ))
  count=$(( $count + 1 ))

  # different values otherwise dontcare
  md5_6515=abc 
  md5_6516=xyz
  md5_a=$(echo "$sql_a"  | psql -qtAXp $port1 | md5sum | cut -b1-9)
  md5_b=$(echo "$sql_b"  | psql -qtAXp $port1 | md5sum | cut -b1-9)
  md5_t=$(echo "$sql_t"  | psql -qtAXp $port1 | md5sum | cut -b1-9)
  md5_h=$(echo "$sql_h"  | psql -qtAXp $port1 | md5sum | cut -b1-9)
  cnt_a=$(echo "$sqlc_a" | psql -qtAXp $port1)
  cnt_b=$(echo "$sqlc_b" | psql -qtAXp $port1)
  cnt_t=$(echo "$sqlc_t" | psql -qtAXp $port1)
  cnt_h=$(echo "$sqlc_h" | psql -qtAXp $port1)

  md5_1=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
# md5_1=$( echo "$md5_a $md5_b $md5_t"        | md5sum | cut -b1-9)
  echo      "$port1   $md5_a $md5_b $md5_t $md5_h    $md5_1      $cnt_a  $cnt_b  $cnt_t  $cnt_h"
# echo      "$port1   $md5_a $md5_b $md5_t           $md5_1      $cnt_a  $cnt_b  $cnt_t  $cnt_h"

  md5_a=$(echo "$sql_a"  | psql -qtAXp $port2 | md5sum | cut -b1-9)
  md5_b=$(echo "$sql_b"  | psql -qtAXp $port2 | md5sum | cut -b1-9)
  md5_t=$(echo "$sql_t"  | psql -qtAXp $port2 | md5sum | cut -b1-9)
  md5_h=$(echo "$sql_h"  | psql -qtAXp $port2 | md5sum | cut -b1-9)
  cnt_a=$(echo "$sqlc_a" | psql -qtAXp $port2)
  cnt_b=$(echo "$sqlc_b" | psql -qtAXp $port2)
  cnt_t=$(echo "$sqlc_t" | psql -qtAXp $port2)
  cnt_h=$(echo "$sqlc_h" | psql -qtAXp $port2)

  md5_2=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
# md5_2=$( echo "$md5_a $md5_b $md5_t"        | md5sum | cut -b1-9)

  echo      "$port2   $md5_a $md5_b $md5_t $md5_h    $md5_2      $cnt_a  $cnt_b  $cnt_t  $cnt_h              ($count / $secs sec)"
# echo      "$port2   $md5_a $md5_b $md5_t           $md5_2      $cnt_a  $cnt_b  $cnt_t  $cnt_h              ($count / $secs sec)"

  if [[ "$md5_1" == "$md5_2" ]]
  then
      echo "   ok (2)"
      break
  fi

  if [[ $secs -gt $BASTA_COUNT ]]
  then
    echo "    NOK but BASTA   (count is $count, secs is $secs)"
    # dump some detail of the pgbench_history diff:
    for p in 6525 6526
    do
      echo "select current_setting('port'), count(*) from pgbench_history;" | psql -qX -d postgres -p $p && \
      echo "select * from pgbench_history order by hid" | psql -qX -d postgres -p $p > out_port_$p.txt
     
    done
    break
  fi

  echo " NOK   (${wait}s)"
  echo

  sleep $wait

done
echo

md5sum out_port_*.txt
echo

wc -l out_port_*.txt | grep -Ev total
echo

if [[ 1 -eq 1 ]]; then

#  stop instances
for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( $baseport + $n - 1 ))
  logfile=$server_dir/logfile.$port
  $pg_ctl stop -D $data_dir --mode=fast --wait
done

fi

if [[ 0 -eq 1 ]]; then

  #  delete everything
  echo "rm -rf /tmp/cascade/logrep_rowfilter/instance*"
        rm -rf /tmp/cascade/logrep_rowfilter/instance*

fi

Reply via email to