On 2018-03-01 16:27, Erik Rijkers wrote:
On 2018-03-01 00:03, Euler Taveira wrote:
The attached patches add support for filtering rows in the publisher.
001-Refactor-function-create_estate_for_relation.patch
0002-Rename-a-WHERE-node.patch
0003-Row-filtering-for-logical-replication.patch
Comments?
Very, very useful. I really do hope this patch survives the
late-arrival-cull.
I built this functionality into a test program I have been using and
in simple cascading replication tests it works well.
I did find what I think is a bug (a bug easy to avoid but also easy to
run into):
The test I used was to cascade 3 instances (all on one machine) from
A->B->C
I ran a pgbench session in instance A, and used:
in A: alter publication pub0_6515 add table pgbench_accounts where
(aid between 40000 and 60000-1);
in B: alter publication pub1_6516 add table pgbench_accounts;
The above worked well, but when I did the same but used the filter in
both publications:
in A: alter publication pub0_6515 add table pgbench_accounts where
(aid between 40000 and 60000-1);
in B: alter publication pub1_6516 add table pgbench_accounts where
(aid between 40000 and 60000-1);
then the replication only worked for (pgbench-)scale 1 (hence: very
little data); with larger scales it became slow (taking many minutes
where the above had taken less than 1 minute), and ended up using far
too much memory (or blowing up/crashing altogether). Something not
quite right there.
Nevertheless, I am much in favour of acquiring this functionality as
soon as possible.
Attached is 'logrep_rowfilter.sh', a demonstration of above-described
bug.
The program runs initdb for 3 instances in /tmp (using ports 6515, 6516,
and 6517) and sets up logical replication from 1->2->3.
It can be made to work by removing de where-clause on the second 'create
publication' ( i.e., outcomment the $where2 variable ).
Thanks,
Erik Rijkers
#!/bin/sh
# postges binary with
#
# 0001-Refactor-function-create_estate_for_relation.patch
# 0002-Rename-a-WHERE-node.patch
# 0003-Row-filtering-for-logical-replication.patch
#
unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres
scale=10
root_dir=/tmp/cascade
BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast
export PATH=$BIN:$PATH
initdb=$BIN/initdb
postgres=$BIN/postgres
pg_ctl=$BIN/pg_ctl
baseport=6515
if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi
devel_file=/tmp/bugs
echo filterbug>$devel_file
num_instances=3
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( 6515 + $n -1 ))
logfile=$server_dir/logfile.$port
echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
$initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file &> /dev/null
( $postgres -D $data_dir -p $port \
--wal_level=logical --logging_collector=on \
--client_min_messages=warning \
--log_directory=$server_dir --log_filename=logfile.${port} \
--log_replication_commands=on & ) &> /dev/null
done
echo "sleep 3s"
sleep 3
echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp 6515 \
&& echo "
drop table if exists pgbench_accounts;
drop table if exists pgbench_branches;
drop table if exists pgbench_tellers;
drop table if exists pgbench_history;" | psql -qXp 6516 \
&& pgbench -p 6515 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
| psql -q1Xp 6515 && pg_dump -F c -p 6515 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p 6516 -d postgres
appname=rowfilter
where="where (aid between 40000 and 60000-1)"
where2="where (aid between 40000 and 60000-1)"
echo "
create publication pub1;
alter publication pub1 add table pgbench_accounts $where ; --> where 1
alter publication pub1 add table pgbench_branches;
alter publication pub1 add table pgbench_tellers;
alter publication pub1 add table pgbench_history;
" | psql -p 6515 -aqtAX
if [[ $num_instances -eq 3 ]]; then
pg_dump -F c -p 6515 \
--exclude-table-data=pgbench_history \
--exclude-table-data=pgbench_accounts \
--exclude-table-data=pgbench_branches \
--exclude-table-data=pgbench_tellers \
-t pgbench_history -t pgbench_accounts \
-t pgbench_branches -t pgbench_tellers \
| pg_restore -1 -p 6517 -d postgres
echo "
create publication pub2;
alter publication pub2 add table pgbench_accounts $where2 ; --> where 2
alter publication pub2 add table pgbench_branches;
alter publication pub2 add table pgbench_tellers;
alter publication pub2 add table pgbench_history;
" | psql -p 6516 -aqtAX
fi
echo "
create subscription sub1 connection 'port=6515 application_name=$appname'
publication pub1 with(enabled=false);
alter subscription sub1 enable;" | psql -p 6516 -aqtAX
if [[ $num_instances -eq 3 ]]; then
echo "
create subscription sub2 connection 'port=6516 application_name=$appname'
publication pub2 with(enabled=false);
alter subscription sub2 enable;" | psql -p 6517 -aqtAX
fi
echo "-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres # scale $scale"
pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres # scale $scale
echo
echo " accounts branches tellers history"
echo " --------- --------- --------- ---------"
while [[ 1 -eq 1 ]]
do
md5_6515=6515 md5_6516=6516 md5_6517=6517
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
md5_6515=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo "6515 $md5_a $md5_b $md5_t $md5_h $md5_6515 "
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
md5_6516=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo "6516 $md5_a $md5_b $md5_t $md5_h $md5_6516 "
if [[ $num_instances -eq 3 ]]; then
md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_t=$(echo "select * from pgbench_tellers order by tid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_h=$(echo "select * from pgbench_history order by hid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
md5_6517=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
echo -ne "6517 $md5_a $md5_b $md5_t $md5_h $md5_6517 "
else
echo
fi
if [[ "$md5_6515" == "$md5_6516" ]]
then
if [[ $num_instances -eq 2 ]]; then
echo " ok (2)"
break
elif [[ $num_instances -eq 3 ]]; then
if [[ "$md5_6515" == "$md5_6517" ]]
then
echo " ok"
break
fi
fi
fi
echo " NOK"
echo
sleep 10
done
echo
# stop instances
for n in `seq 1 $num_instances`
do
instance=instance$n
server_dir=$root_dir/$instance
data_dir=$server_dir/data
port=$(( 6515 + $n - 1 ))
logfile=$server_dir/logfile.$port
$pg_ctl stop -D $data_dir --mode=immediate --wait
done
# delete everything
echo "rm -rf /tmp/cascade/instance*"
rm -rf /tmp/cascade/instance*