Hi Hackers,

Our team (mainly Shlok) did a performance testing with several workloads. Let me
share them on -hackers. We did it for master/REL_17 branches, and in this post
master's one will be discussed.

The observed trend is:
We observed that the performance regression exists primarily during frequent
execution of publication DDL statements that modify published tables. This is
expected due to the necessary cache rebuild and distribution overhead involved.
The regression is small or nearly nonexistent in scenarios where DDLs do not
affect published tables or when the frequency of such DDL statements is low.

Used source
========
The base code was HEAD plus some modifications, which could selectively 
invalidate
a relsync caches. It is now pushed by 3abe9d. The compared patch was v16.


We did five benchmarks, let me share one by one.

-----

Workload A: No DDL operation done in concurrent session
======================================
In this workload, number of concurrent transactions were varied, but none of 
them
contained DDL commands. Decoding time of all transactions were measured and 
compared.
We expected that the performance would not be changed because any of caches 
could
be invalidated. Actual workload is noted in [1] and runner is attached.

Below table contains a result. We could not find notable degradations.

Concurrent txn     | Head (sec)    | Patch (sec)  | Degradation (%)
------------------ | ------------  | ------------ | ----------------
50                 | 0.013196      | 0.013314     | 0.8968
100                | 0.014531      | 0.014728     | 1.3558
500                | 0.018079      | 0.017969     | -0.6066
1000               | 0.023087      | 0.023772     | 2.9670
2000               | 0.031311      | 0.031750     | 1.4010

-----

Workload B: DDL is happening but is unrelated to publication
=======================================
In this workload, one of concurrent transactions contained a DDL, but it did not
related with the publication and publishing tables. We also expected that the
performance would not be changed.
Actual workload is noted in [2] and runner is attached.

Below table contains a result. The patch we proposed distributes invalidation
messages to concurrent decoding transactions. It would be roughly proportional 
to
the concurrency, and what we observed proves the theory. Since inval messages
does not invalidate relsync caches, the difference is not so large.

Concurrent txn     | Head (sec)    | Patch (sec)  | Degradation (%) 
------------------ | ------------  | ------------ | ----------------
50                 | 0.013410      | 0.013217     | -1.4417
100                | 0.014694      | 0.015260     | 3.8496
1000               | 0.023211      | 0.025376     | 9.3289
2000               | 0.032954      | 0.036322     | 10.2213


-----

Workload C. DDL is happening on publication but on unrelated table
===========================================
In this workload, one of concurrent transactions contained a DDL which altered
the using publication. But it just ADD/DROP table which was not being decoded.
Actual workload is noted in [3] and runner is attached.

Below table contains a result. Since the commit 3abe9dc, no need to rebuild the
whole of relsync cache anymore for the unrelated publish actions. Thus the
degradation was mostly same as B.

Concurrent txn     | Head (sec)    | Patch (sec)  | Degradation (%)
------------------ | ------------  | ------------ | ----------------
50                 | 0.013546      | 0.013409     | -1.0089
100                | 0.015225      | 0.015357     | 0.8648
500                | 0.017848      | 0.019300     | 8.1372
1000               | 0.023430      | 0.025152     | 7.3497
2000               | 0.032041      | 0.035877     | 11.9723


-----

Workload D. DDL is happening on the related published table,
                        and one insert is done per invalidation
=========================================
In this workload, one of concurrent transactions contained a DDL which altered
the using publication. Also, it DROP/ADD table which was being decoded. Actual
workload is noted in [4] and runner is attached.

Below table contains a result. Apart from B and C, we could expect that this
workload had huge degradation, because each distributed message would require
the rebuild of relsync caches. This meant that caches were discarded and 
re-built
for every transaction. And the result showed around 300% regression for 2000
concurrent transactions.

IIUC it is difficult to avoid the regression with current design.

Concurrent txn     | Head (sec)    | Patch (sec)  | Degradation (%)
------------------ | ------------  | ------------ | ----------------
50                 | 0.013944      | 0.016460     | 18.0384
100                | 0.014952      | 0.020160     | 34.8322
500                | 0.018535      | 0.043122     | 132.6577
1000               | 0.023426      | 0.072215     | 208.2628
2000               | 0.032055      | 0.131884     | 311.4314

-----

Workload E. DDL is happening on the related published table,
                        and 1000 inserts are done per invalidation
===========================================
This workload was mostly same ad D, but the number of inserted tuples was 1000x.
We expected that rebuilding caches is not so dominant in the workload so that
the regression would be small.
Actual workload is noted in [5] and runner is attached.

Below contains result. Apart from D. there were not huge regression. This 
reasonable
result because decoding insertion 1000 times occupied much CPU time.

Concurrent txn     | Head (sec)    | Patch (sec)  | Degradation (%)
------------------ | ------------  | ------------ | ----------------
50                 | 0.093019      | 0.108820     | 16.9869
100                | 0.188367      | 0.199621     | 5.9741
500                | 0.967896      | 0.970674     | 0.2870
1000               | 1.658552      | 1.803991     | 8.7691
2000               | 3.482935      | 3.682771     | 5.7376


Thanks again Shlok to measure data.

[1]:
1. Created a publisher on a single table, say 'tab_conc1';
2. 'n +1' sessions are running in parallel 
3. Now:
     All 'n' sessions : 
        BEGIN;
        Insert a row in table 'tab_conc1';
     In a session : 
        Insert a row in table 'tab_conc1';
        Insert a row in table 'tab_conc1'
     All 'n' sessions : 
        Insert a row in table 'tab_conc1';
        COMMIT; 
4. run 'pg_logical_slot_get_binary_changes' to get the decoding changes.

[2]:
1. Created a publisher on a single table, say 'tab_conc1';
2. 'n +1' sessions are running in parallel 
3. Now:
     All 'n' sessions : 
        BEGIN;
        Insert a row in table 'tab_conc1'
     In a session : 
        BEGIN; ALTER TABLE t1 ADD COLUMN b int; COMMIT;
        BEGIN; ALTER TABLE t1 DROP COLUMN b; COMMIT;
     All 'n' sessions : 
        Insert a row in table 'tab_conc1';
        COMMIT; 
4. run 'pg_logical_slot_get_binary_changes' to get the decoding changes.

[3]
Steps:
1. Created a publisher on a table, say 'tab_conc1', 't1';
2. 'n +1'  sessions are running in parallel
3. Now:
      All 'n' sessions : 
        BEGIN; 
        Insert a row in table 'tab_conc1'
      In a session : 
        BEGIN; ALTER PUBLICATION regress_pub1 DROP TABLE t1; COMMIT;  
        BEGIN; ALTER PUBLICATION regress_pub1 ADD TABLE t1; COMMIT;
      All 'n' sessions : 
        Insert a row in table 'tab_conc1';
        COMMIT; 
4. run 'pg_logical_slot_get_binary_changes' to get the decoding changes.

[4]:
1. Created a publisher on a single table, say 'tab_conc1';
2. 'n + 1' sessions are running in parallel 
3. Now:
     All 'n' sessions :
        BEGIN;
        Insert a row in table 'tab_conc1'
     In a session :
        BEGIN; Alter publication DROP 'tab_conc1'; COMMIT;
        BEGIN; Alter publication ADD 'tab_conc1'; COMMIT;
     All 'n' sessions : 
        Insert a row in table 'tab_conc1';
        COMMIT; 
4. run 'pg_logical_slot_get_binary_changes' to get the decoding changes.

[5]:
1. Created a publisher on a single table, say 'tab_conc1';
2. 'n +1' sessions are running in parallel 
3. Now:
     All 'n' sessions : 
        BEGIN;
        Insert 1000 rows in table 'tab_conc1'
     In a session : 
        BEGIN; ALTER PUBLICATION regress_pub1 DROP 'tab_conc1'; COMMIT; 
        BEGIN; ALTER PUBLICATION regress_pub1 ADD 'tab_conc1'; COMMIT;
     All 'n' sessions :
        Insert 1000 rows in table 'tab_conc1';
        COMMIT; 
4. run 'pg_logical_slot_get_binary_changes' to get the decoding changes.

Best regards,
Hayato Kuroda
FUJITSU LIMITED

use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# no. of concurrent txn
my $concurrent_txns = 2000;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# Setup node
my $node = PostgreSQL::Test::Cluster->new('publisher');
$node->init(allows_streaming => 'logical');
$node->append_conf(
        'postgresql.conf', q[
shared_buffers = 40GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
logical_decoding_work_mem = 40GB
]);
$node->append_conf('postgresql.conf', "max_connections = $max_connections");
$node->start;

# table t1 is not part of any publication
$node->safe_psql(
        'postgres', qq(
        CREATE TABLE t1(a int);
        CREATE TABLE tab_conc1(a int);
        CREATE PUBLICATION regress_pub1 for table tab_conc1;
        SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 
'pgoutput');
));

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node->background_psql(
        'postgres',
        on_error_stop => 0,
        timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe('BEGIN;');
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (11);]);
}

$node->safe_psql('postgres', 'INSERT INTO tab_conc1 VALUES (111);');
$node->safe_psql('postgres', 'INSERT INTO tab_conc1 VALUES (111);');

# Insert will build cache for t1, as t1 is also invalidates in above step
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12);]);
    $background_psql->query_safe('COMMIT;');
        $background_psql->quit;
}

# Measure the decoding time via SQL interface - 
pg_logical_slot_get_binary_changes
my $start = Time::HiRes::gettimeofday();
my $result = $node->safe_psql('postgres',
"SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', 
NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');");
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

is($result, 4*$concurrent_txns+7, 'changes were decoded properly');

$node->stop('fast');

done_testing();
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# no. of concurrent txn
my $concurrent_txns = 50;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# Setup node
my $node = PostgreSQL::Test::Cluster->new('publisher');
$node->init(allows_streaming => 'logical');
$node->append_conf(
        'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
logical_decoding_work_mem = 10GB
]);
$node->append_conf('postgresql.conf', "max_connections = $max_connections");
$node->start;

# table t1 is not part of any publication
$node->safe_psql(
        'postgres', qq(
        CREATE TABLE t1(a int);
        CREATE TABLE tab_conc1(a int);
        CREATE PUBLICATION regress_pub1 for table tab_conc1;
        SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 
'pgoutput');
));

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node->background_psql(
        'postgres',
        on_error_stop => 0,
        timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe('BEGIN;');
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (11);]);
}

# unrelated invalidate message is distributed to all $concurrent_txns
foreach my $i (1 .. 1)
{
    $node->safe_psql('postgres', 'ALTER TABLE t1 ADD COLUMN b int');
        $node->safe_psql('postgres', 'ALTER TABLE t1 DROP COLUMN b');
}

# Insert will build cache for t1, as t1 is also invalidates in above step
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12);]);
    $background_psql->query_safe('COMMIT;');
        $background_psql->quit;
}

# Measure the decoding time via SQL interface - 
pg_logical_slot_get_binary_changes
my $start = Time::HiRes::gettimeofday();
my $result = $node->safe_psql('postgres',
"SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', 
NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');");
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

is($result, 4*$concurrent_txns+1, 'changes were decoded properly');

$node->stop('fast');

done_testing();
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# no. of concurrent txn
my $concurrent_txns = 100;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# Setup node
my $node = PostgreSQL::Test::Cluster->new('publisher');
$node->init(allows_streaming => 'logical');
$node->append_conf(
        'postgresql.conf', q[
shared_buffers = 10GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
logical_decoding_work_mem = 10GB
]);
$node->append_conf('postgresql.conf', "max_connections = $max_connections");
$node->start;

# table t1 is not part of any publication
$node->safe_psql(
        'postgres', qq(
        CREATE TABLE t1(a int);
        CREATE TABLE tab_conc1(a int);
        CREATE PUBLICATION regress_pub1 for table tab_conc1,t1;
        SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 
'pgoutput');
));

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node->background_psql(
        'postgres',
        on_error_stop => 0,
        timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe('BEGIN;');
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (11);]);
}

# unrelated invalidate message is distributed to all $concurrent_txns
foreach my $i (1 .. 1)
{
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 DROP TABLE 
t1;');
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 ADD TABLE 
t1;');
}

foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12);]);
    $background_psql->query_safe('COMMIT;');
        $background_psql->quit;
}

# Measure the decoding time via SQL interface - 
pg_logical_slot_get_binary_changes
my $start = Time::HiRes::gettimeofday();
my $result = $node->safe_psql('postgres',
"SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', 
NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');");
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

is($result, 4*$concurrent_txns+1, 'changes were decoded properly');

$node->stop('fast');

done_testing();
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# no. of concurrent txn
my $concurrent_txns = 2000;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# Setup node
my $node = PostgreSQL::Test::Cluster->new('publisher');
$node->init(allows_streaming => 'logical');
$node->append_conf(
        'postgresql.conf', q[
shared_buffers = 40GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
logical_decoding_work_mem = 40GB
]);
$node->append_conf('postgresql.conf', "max_connections = $max_connections");
$node->start;
$node->safe_psql(
        'postgres', qq(
        CREATE TABLE tab_conc1(a int);
        CREATE PUBLICATION regress_pub1 for table tab_conc1;
        SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 
'pgoutput');
));

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node->background_psql(
        'postgres',
        on_error_stop => 0,
        timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[BEGIN;INSERT INTO tab_conc1 VALUES (11);]);
}

# invalidate message is distributed to all $concurrent_txns transactions
foreach my $i (1 .. 1)
{
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 DROP TABLE 
tab_conc1;');
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 ADD TABLE 
tab_conc1;');
}

# Insert will build cache for each txn as cache is invalidated for each txn in 
previous step
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12); 
COMMIT;]);
    $background_psql->quit;
}

# Measure the decoding time via SQL interface - 
pg_logical_slot_get_binary_changes
my $start = Time::HiRes::gettimeofday();
my $result = $node->safe_psql('postgres',
"SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', 
NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');");
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

# check with patch
#is($result, $concurrent_txns * 6 , 'changes were decoded properly');

# check with HEAD
is($result, $concurrent_txns * 4 + 1, 'changes were decoded properly');

$node->stop('fast');

done_testing();
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# no. of concurrent txn
my $concurrent_txns = 2000;

# max_connections to be set
my $max_connections = $concurrent_txns + 100;

# Setup node
my $node = PostgreSQL::Test::Cluster->new('publisher');
$node->init(allows_streaming => 'logical');
$node->append_conf(
        'postgresql.conf', q[
shared_buffers = 40GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = on
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
logical_decoding_work_mem = 40GB
]);
$node->append_conf('postgresql.conf', "max_connections = $max_connections");
$node->start;
$node->safe_psql(
        'postgres', qq(
        CREATE TABLE tab_conc1(a int);
        CREATE PUBLICATION regress_pub1 for table tab_conc1;
        SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 
'pgoutput');
));

my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;

my @background_psqls;

# create $concurrent_txns sessions
foreach my $i (1 .. $concurrent_txns)
{
    my $background_psql = $node->background_psql(
        'postgres',
        on_error_stop => 0,
        timeout => $psql_timeout_secs);

    push (@background_psqls, $background_psql);
}

# begin txn in each session
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[
BEGIN;
DO \$\$
BEGIN
    FOR cnt IN 1..1000 LOOP
        INSERT INTO tab_conc1 VALUES (11);
    END LOOP;
END; \$\$;]);
}

# invalidate message is distributed to all $concurrent_txns transactions
foreach my $i (1 .. 1)
{
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 DROP TABLE 
tab_conc1;');
    $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 ADD TABLE 
tab_conc1;');
}

# Insert will build cache for each txn as cache is invalidated for each txn in 
previous step
foreach my $background_psql (@background_psqls)
{
    $background_psql->query_safe(qq[
DO \$\$
BEGIN
    FOR cnt IN 1..1000 LOOP
        INSERT INTO tab_conc1 VALUES (12);
    END LOOP;
END; \$\$;
COMMIT;]);
    $background_psql->quit;
}

# Measure the decoding time via SQL interface - 
pg_logical_slot_get_binary_changes
my $start = Time::HiRes::gettimeofday();
my $result = $node->safe_psql('postgres',
"SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', 
NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');");
my $end = Time::HiRes::gettimeofday();

printf("time elapsed %f\n", $end - $start);

# check with patch
#is($result, $concurrent_txns * 2004 , 'changes were decoded properly');

# check with HEAD
is($result, $concurrent_txns * 2002 + 1, 'changes were decoded properly');

$node->stop('fast');

done_testing();

Reply via email to