On 2019-09-03 05:32, Euler Taveira wrote:
Em ter, 3 de set de 2019 às 00:16, Alexey Zagarin <zaga...@gmail.com>
escreveu:
There are complaints in the log (both pub and sub) like:
ERROR: trying to store a heap tuple into wrong type of slot
I have no idea what causes that.
Yeah, I've seen that too. It was fixed by Alexey Kondratov, in line
955 of 0005-Row-filtering-for-logical-replication.patch it should be
&TTSOpsHeapTuple instead of &TTSOpsVirtual.
Ops... exact. That was an oversight while poking with different types
of slots.
OK, I'll consider Alexey Kondratov's set of patches as the current
state-of-the-art then. (They still apply.)
I found a problem where I'm not sure it's a bug:
The attached bash script does a test by setting up pgbench tables on
both master and replica, and then sets up logical replication for a
slice of pgbench_accounts. Then it does a short pgbench run, and loops
until the results become identical(ok) (or breaks out after a certain
time (NOK=not ok)).
It turns out this did not work until I added a wait state after the
CREATE SUBSCRIPTION. It always fails without the wait state, and always
works with the wait state.
Do you agree this is a bug?
thanks (also to both Alexeys :))
Erik Rijkers
PS
by the way, this script won't run as-is on other machines; it has stuff
particular to my local setup.
#!/bin/bash
# postgres binary compiled with
#
# pgpatches/0130/logrep_rowfilter/20190902/v2-0001-Remove-unused-atttypmod-column-from-initial-table.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0002-Store-number-of-tuples-in-WalRcvExecResult.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0003-Refactor-function-create_estate_for_relation.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0004-Rename-a-WHERE-node.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0005-Row-filtering-for-logical-replication.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0006-Print-publication-WHERE-condition-in-psql.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0007-Publication-where-condition-support-for-pg_dump.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0008-Debug-for-row-filtering.patch
# pgpatches/0130/logrep_rowfilter/20190902/v2-0009-Add-simple-BDR-test-for-row-filtering.patch
unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres
root_dir=/tmp/cascade/logrep_rowfilter
mkdir -p $root_dir
BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin
export PATH=$BIN:$PATH
initdb=$BIN/initdb
postgres=$BIN/postgres
pg_ctl=$BIN/pg_ctl
baseport=6525
port1=$(( $baseport + 0 ))
port2=$(( $baseport + 1 ))
appname=rowfilter
num_instances=2
scale=1 where="where (aid between 40000 and 50000-1)"
# scale=10 where="where (aid between 400000 and 400000+50000-1)"
clients=64
duration=20
wait=10 BASTA_COUNT=40 # 7200 # wait seconds in total
if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
devel_file=/tmp/bugs
echo filterbug>$devel_file
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( $baseport + $n -1 ))
logfile=$server_dir/logfile.$port
echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
$initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file &> /dev/null
( $postgres -D $data_dir -p $port \
--wal_level=logical --logging_collector=on \
--client_min_messages=warning \
--log_directory=$server_dir --log_filename=logfile.${port} \
--log_replication_commands=on & ) &> /dev/null
done
echo "sleep 3s"
sleep 3
echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port1 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp $port2 \
&& pgbench -p $port1 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
| psql -q1Xp $port1 && pg_dump -F c -p $port1 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p $port2 -d postgres
pub1=pub_${port1}_to_${port2}
sub1=sub_${port2}_fr_${port1}
echo -ne "
create publication $pub1;
alter publication $pub1 add table pgbench_accounts $where ; --> where
alter publication $pub1 add table pgbench_branches;
alter publication $pub1 add table pgbench_tellers;
alter publication $pub1 add table pgbench_history;
" | psql -p $port1 -aqtAX
### sleep 10 # no need for a wait
echo "
create subscription $sub1 connection 'port=$port1 application_name=$appname'
publication $pub1 with(enabled=false);
alter subscription $sub1 enable;" | psql -p $port2 -aqtAX
sleep 1 # WAIT NECESSARY
echo "-- pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres # scale $scale"
pgbench -p $port1 -c $clients -j 8 -T $duration -n postgres # scale $scale
echo
echo " accounts branches tellers history"
echo " --------- --------- --------- ---------"
sql_a="select * from pgbench_accounts $where order by aid;"
sql_b="select * from pgbench_branches order by bid;"
sql_t="select * from pgbench_tellers order by tid;"
sql_h="select * from pgbench_history order by hid;"
sqlc_a="select count(*) from pgbench_accounts $where;"
sqlc_b="select count(*) from pgbench_branches ;"
sqlc_t="select count(*) from pgbench_tellers ;"
sqlc_h="select count(*) from pgbench_history ;"
count=0
while [[ 1 -eq 1 ]]
do
secs=$(( $count * $wait ))
count=$(( $count + 1 ))
# different values otherwise dontcare
md5_6515=abc
md5_6516=xyz
md5_a=$(echo "$sql_a" | psql -qtAXp $port1 | md5sum | cut -b1-9)
md5_b=$(echo "$sql_b" | psql -qtAXp $port1 | md5sum | cut -b1-9)
md5_t=$(echo "$sql_t" | psql -qtAXp $port1 | md5sum | cut -b1-9)
md5_h=$(echo "$sql_h" | psql -qtAXp $port1 | md5sum | cut -b1-9)
cnt_a=$(echo "$sqlc_a" | psql -qtAXp $port1)
cnt_b=$(echo "$sqlc_b" | psql -qtAXp $port1)
cnt_t=$(echo "$sqlc_t" | psql -qtAXp $port1)
cnt_h=$(echo "$sqlc_h" | psql -qtAXp $port1)
md5_1=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
# md5_1=$( echo "$md5_a $md5_b $md5_t" | md5sum | cut -b1-9)
echo "$port1 $md5_a $md5_b $md5_t $md5_h $md5_1 $cnt_a $cnt_b $cnt_t $cnt_h"
# echo "$port1 $md5_a $md5_b $md5_t $md5_1 $cnt_a $cnt_b $cnt_t $cnt_h"
md5_a=$(echo "$sql_a" | psql -qtAXp $port2 | md5sum | cut -b1-9)
md5_b=$(echo "$sql_b" | psql -qtAXp $port2 | md5sum | cut -b1-9)
md5_t=$(echo "$sql_t" | psql -qtAXp $port2 | md5sum | cut -b1-9)
md5_h=$(echo "$sql_h" | psql -qtAXp $port2 | md5sum | cut -b1-9)
cnt_a=$(echo "$sqlc_a" | psql -qtAXp $port2)
cnt_b=$(echo "$sqlc_b" | psql -qtAXp $port2)
cnt_t=$(echo "$sqlc_t" | psql -qtAXp $port2)
cnt_h=$(echo "$sqlc_h" | psql -qtAXp $port2)
md5_2=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
# md5_2=$( echo "$md5_a $md5_b $md5_t" | md5sum | cut -b1-9)
echo "$port2 $md5_a $md5_b $md5_t $md5_h $md5_2 $cnt_a $cnt_b $cnt_t $cnt_h ($count / $secs sec)"
# echo "$port2 $md5_a $md5_b $md5_t $md5_2 $cnt_a $cnt_b $cnt_t $cnt_h ($count / $secs sec)"
if [[ "$md5_1" == "$md5_2" ]]
then
echo " ok (2)"
break
fi
if [[ $secs -gt $BASTA_COUNT ]]
then
echo " NOK but BASTA (count is $count, secs is $secs)"
# dump some detail of the pgbench_history diff:
for p in 6525 6526
do
echo "select current_setting('port'), count(*) from pgbench_history;" | psql -qX -d postgres -p $p && \
echo "select * from pgbench_history order by hid" | psql -qX -d postgres -p $p > out_port_$p.txt
done
break
fi
echo " NOK (${wait}s)"
echo
sleep $wait
done
echo
md5sum out_port_*.txt
echo
wc -l out_port_*.txt | grep -Ev total
echo
if [[ 1 -eq 1 ]]; then
# stop instances
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( $baseport + $n - 1 ))
logfile=$server_dir/logfile.$port
$pg_ctl stop -D $data_dir --mode=fast --wait
done
fi
if [[ 0 -eq 1 ]]; then
# delete everything
echo "rm -rf /tmp/cascade/logrep_rowfilter/instance*"
rm -rf /tmp/cascade/logrep_rowfilter/instance*
fi