On 2018-03-01 16:27, Erik Rijkers wrote:
On 2018-03-01 00:03, Euler Taveira wrote:
The attached patches add support for filtering rows in the publisher.

001-Refactor-function-create_estate_for_relation.patch
0002-Rename-a-WHERE-node.patch
0003-Row-filtering-for-logical-replication.patch

Comments?

Very, very useful. I really do hope this patch survives the late-arrival-cull.

I built this functionality into a test program I have been using and
in simple cascading replication tests it works well.

I did find what I think is a bug (a bug easy to avoid but also easy to
run into):
The test I used was to cascade 3 instances (all on one machine) from A->B->C
I ran a pgbench session in instance A, and used:
  in A: alter publication pub0_6515 add table pgbench_accounts where
(aid between 40000 and 60000-1);
  in B: alter publication pub1_6516 add table pgbench_accounts;

The above worked well, but when I did the same but used the filter in
both publications:
  in A: alter publication pub0_6515 add table pgbench_accounts where
(aid between 40000 and 60000-1);
  in B: alter publication pub1_6516 add table pgbench_accounts where
(aid between 40000 and 60000-1);

then the replication only worked for (pgbench-)scale 1 (hence: very
little data); with larger scales it became slow (taking many minutes
where the above had taken less than 1 minute), and ended up using far
too much memory (or blowing up/crashing altogether).  Something not
quite right there.

Nevertheless, I am much in favour of acquiring this functionality as
soon as possible.


Attached is 'logrep_rowfilter.sh', a demonstration of above-described bug.

The program runs initdb for 3 instances in /tmp (using ports 6515, 6516, and 6517) and sets up logical replication from 1->2->3.

It can be made to work by removing de where-clause on the second 'create publication' ( i.e., outcomment the $where2 variable ).


Thanks,


Erik Rijkers
#!/bin/sh

# postges binary with 
#
#  0001-Refactor-function-create_estate_for_relation.patch
#  0002-Rename-a-WHERE-node.patch
#  0003-Row-filtering-for-logical-replication.patch
#

unset PGDATABASE PGPORT PGSERVICE
export PGDATABASE=postgres

scale=10

root_dir=/tmp/cascade

BIN=$HOME/pg_stuff/pg_installations/pgsql.logrep_rowfilter/bin.fast

export PATH=$BIN:$PATH

  initdb=$BIN/initdb
postgres=$BIN/postgres
  pg_ctl=$BIN/pg_ctl
baseport=6515

if [[ -d $root_dir/instance1 ]]; then rm -rf $root_dir/instance1; fi
if [[ -d $root_dir/instance2 ]]; then rm -rf $root_dir/instance2; fi
if [[ -d $root_dir/instance3 ]]; then rm -rf $root_dir/instance3; fi
if [[ -d $root_dir/instance1 ]]; then exit ; fi
if [[ -d $root_dir/instance2 ]]; then exit ; fi
if [[ -d $root_dir/instance3 ]]; then exit ; fi

devel_file=/tmp/bugs
echo filterbug>$devel_file

num_instances=3

for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( 6515 + $n -1 ))
  logfile=$server_dir/logfile.$port
  echo "-- $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file "
           $initdb --pgdata=$data_dir --encoding=UTF8 --pwfile=$devel_file  &> /dev/null
  ( $postgres  -D $data_dir -p $port \
    --wal_level=logical --logging_collector=on \
    --client_min_messages=warning \
    --log_directory=$server_dir --log_filename=logfile.${port} \
    --log_replication_commands=on & ) &> /dev/null
done 

echo "sleep 3s"
sleep 3

echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp 6515 \
&& echo "
  drop table if exists pgbench_accounts;
  drop table if exists pgbench_branches;
  drop table if exists pgbench_tellers;
  drop table if exists pgbench_history;" | psql -qXp 6516 \
&& pgbench -p 6515 -qis $scale \
&& echo "alter table pgbench_history add column hid serial primary key;" \
  | psql -q1Xp 6515 && pg_dump -F c -p 6515 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
   -t pgbench_history -t pgbench_accounts \
   -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p 6516 -d postgres

appname=rowfilter

   where="where (aid between 40000 and 60000-1)"
  where2="where (aid between 40000 and 60000-1)"

echo "
create publication pub1;
alter publication pub1 add table pgbench_accounts $where ; --> where 1
alter publication pub1 add table pgbench_branches;
alter publication pub1 add table pgbench_tellers;
alter publication pub1 add table pgbench_history;
" | psql -p 6515 -aqtAX

if [[ $num_instances -eq 3 ]]; then

  pg_dump -F c -p 6515 \
     --exclude-table-data=pgbench_history  \
     --exclude-table-data=pgbench_accounts \
     --exclude-table-data=pgbench_branches \
     --exclude-table-data=pgbench_tellers  \
     -t pgbench_history -t pgbench_accounts \
     -t pgbench_branches -t pgbench_tellers \
  | pg_restore -1 -p 6517 -d postgres

  echo "
create publication pub2;
alter publication pub2 add table pgbench_accounts $where2 ; --> where 2
alter publication pub2 add table pgbench_branches;
alter publication pub2 add table pgbench_tellers;
alter publication pub2 add table pgbench_history;
  " | psql -p 6516 -aqtAX
fi

echo "
create subscription sub1 connection 'port=6515 application_name=$appname'
       publication pub1 with(enabled=false);
alter subscription sub1 enable;" | psql -p 6516 -aqtAX

if [[ $num_instances -eq 3 ]]; then
echo "
create subscription sub2 connection 'port=6516 application_name=$appname'
       publication pub2 with(enabled=false);
alter subscription sub2 enable;" | psql -p 6517 -aqtAX

fi

echo "-- pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres    #  scale $scale"
         pgbench -p 6515 -c 16 -j 8 -T 5 -n postgres    #  scale $scale
echo

echo "       accounts  branches   tellers   history"
echo "       --------- --------- --------- ---------"

while [[ 1 -eq 1 ]]
do
  md5_6515=6515 md5_6516=6516 md5_6517=6517

  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6515 | md5sum | cut -b1-9)
  md5_6515=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo      "6515   $md5_a $md5_b $md5_t $md5_h    $md5_6515 "

  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6516 | md5sum | cut -b1-9)
  md5_6516=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo      "6516   $md5_a $md5_b $md5_t $md5_h    $md5_6516 "

  if [[ $num_instances -eq 3 ]]; then
  md5_a=$(echo "select * from pgbench_accounts $where order by aid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_b=$(echo "select * from pgbench_branches order by bid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_t=$(echo "select * from pgbench_tellers  order by tid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_h=$(echo "select * from pgbench_history  order by hid" | psql -qtAXp 6517 | md5sum | cut -b1-9)
  md5_6517=$( echo "$md5_a $md5_b $md5_t $md5_h" | md5sum | cut -b1-9)
  echo -ne  "6517   $md5_a $md5_b $md5_t $md5_h    $md5_6517 "
  else
    echo
  fi

  if [[ "$md5_6515" == "$md5_6516" ]]
  then
    if [[ $num_instances -eq 2 ]]; then
      echo "   ok (2)"
      break
    elif [[ $num_instances -eq 3 ]]; then
      if [[ "$md5_6515" == "$md5_6517" ]]
      then
        echo "   ok"
        break
      fi
    fi
  fi
  echo "  NOK"
  echo
  sleep 10
done
echo

#  stop instances
for n in `seq 1 $num_instances`
do
  instance=instance$n
  server_dir=$root_dir/$instance
  data_dir=$server_dir/data
  port=$(( 6515 + $n - 1 ))
  logfile=$server_dir/logfile.$port
  $pg_ctl stop -D $data_dir --mode=immediate --wait
done

#  delete everything
echo "rm -rf /tmp/cascade/instance*"
      rm -rf /tmp/cascade/instance*


Reply via email to