On Friday, August 9, 2024 7:21 PM Shlok Kyal <shlok.kyal....@gmail.com> wrote:

Hi,

> 
> In the v7 patch, I am looping through the reorder buffer of the current 
> committed
> transaction and storing all invalidation messages in a list. Then I am
> distributing those invalidations.
> But I found that for a transaction we already store all the invalidation 
> messages
> (see [1]). So we don't need to loop through the reorder buffer and store the
> invalidations.
> 
> I have modified the patch accordingly and attached the same.

I have tested this patch across various scenarios and did not find issues.

I confirmed that changes are correctly replicated after adding the table or
schema to the publication, and changes will not be replicated after removing
the table or schema from the publication. This behavior is consistent in both
streaming and non-streaming modes. Additionally, I verified that invalidations
occurring within subtransactions are appropriately distributed.

Please refer to the attached ISOLATION tests which tested the above cases.
This also inspires me if it would be cheaper to write an ISOLATION test for this
bug instead of building a real pub/sub cluster. But I am not against the current
tests in the V8 patch as that can check the replicated data in a visible way.

Best Regards,
Hou zj
From 8f4e36c5fc65d4a88058467a73cbe423a5f0e91e Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.f...@cn.fujitsu.com>
Date: Mon, 9 Sep 2024 19:56:18 +0800
Subject: [PATCH] test invalidation distribution

---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/invalidation_distrubution.out    | 173 ++++++++++++++++++
 .../specs/invalidation_distrubution.spec      |  56 ++++++
 3 files changed, 230 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509a..eef7077067 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
        twophase_snapshot slot_creation_error catalog_change_snapshot \
-       skip_snapshot_restore
+       skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out 
b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 0000000000..cdc871d31d
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,173 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin 
s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 
s2_get_binary_changes s2_drop_pub
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s2_create_pub: CREATE PUBLICATION pub;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin 
s1_insert_tbl1 s2_alter_pub_add_schema s1_commit s1_insert_tbl1 
s2_get_binary_changes s2_drop_pub
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s2_create_pub: CREATE PUBLICATION pub;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA 
public;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_init s2_create_pub s2_alter_pub_add_tbl s1_begin 
s1_insert_tbl1 s2_alter_pub_drop_tbl s1_commit s1_insert_tbl1 
s2_get_binary_changes s2_drop_pub
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s2_create_pub: CREATE PUBLICATION pub;
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_drop_tbl: ALTER PUBLICATION pub DROP TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_init s2_create_pub s2_alter_pub_add_schema s1_begin 
s1_insert_tbl1 s2_alter_pub_drop_schema s1_commit s1_insert_tbl1 
s2_get_binary_changes s2_drop_pub
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s2_create_pub: CREATE PUBLICATION pub;
+step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA 
public;
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_drop_schema: ALTER PUBLICATION pub DROP TABLES IN SCHEMA 
public;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin 
s1_insert_tbl1 s2_begin s2_savepoint s2_alter_pub_add_tbl s2_commit s1_commit 
s1_insert_tbl1 s2_get_binary_changes s2_drop_pub
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s2_create_pub: CREATE PUBLICATION pub;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_begin: BEGIN;
+step s2_savepoint: SAVEPOINT s1;
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s2_commit: COMMIT;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
+
+starting permutation: s2_create_pub s1_init s1_insert_tbl1 s1_begin 
s1_insert_tbl1 s2_set_streaming_mode s2_alter_pub_add_tbl 
s2_get_binary_stream_changes s1_insert_tbl1 s1_commit 
s2_get_binary_stream_changes s2_drop_pub
+step s2_create_pub: CREATE PUBLICATION pub;
+step s1_init: SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+?column?
+--------
+init    
+(1 row)
+
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_set_streaming_mode: SET debug_logical_replication_streaming = 
immediate;
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s2_get_binary_stream_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE 
get_byte(data, 0) = 73;
+count
+-----
+    0
+(1 row)
+
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_commit: COMMIT;
+step s2_get_binary_stream_changes: SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE 
get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+step s2_drop_pub: DROP PUBLICATION pub;
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec 
b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 0000000000..0d3d328250
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,56 @@
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_init" { SELECT 'init' FROM 
pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); }
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_begin" { BEGIN; }
+step "s2_savepoint" { SAVEPOINT s1; }
+step "s2_set_streaming_mode" { SET debug_logical_replication_streaming = 
immediate; }
+step "s2_create_pub" { CREATE PUBLICATION pub; }
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_alter_pub_drop_tbl" { ALTER PUBLICATION pub DROP TABLE tbl1; }
+step "s2_alter_pub_add_schema" { ALTER PUBLICATION pub ADD TABLES IN SCHEMA 
public; }
+step "s2_alter_pub_drop_schema" { ALTER PUBLICATION pub DROP TABLES IN SCHEMA 
public; }
+step "s2_drop_pub" { DROP PUBLICATION pub; }
+
+
+step "s2_get_binary_stream_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE 
get_byte(data, 0) = 73; }
+step "s2_commit" { COMMIT; }
+
+step "s2_get_binary_changes" { SELECT count(data) FROM 
pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 
'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; 
}
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" 
"s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" 
"s2_get_binary_changes" "s2_drop_pub"
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" 
"s1_insert_tbl1" "s2_alter_pub_add_schema" "s1_commit" "s1_insert_tbl1" 
"s2_get_binary_changes" "s2_drop_pub"
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_tbl" "s1_begin" 
"s1_insert_tbl1" "s2_alter_pub_drop_tbl" "s1_commit" "s1_insert_tbl1" 
"s2_get_binary_changes" "s2_drop_pub"
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_schema" "s1_begin" 
"s1_insert_tbl1" "s2_alter_pub_drop_schema" "s1_commit" "s1_insert_tbl1" 
"s2_get_binary_changes" "s2_drop_pub"
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" 
"s1_insert_tbl1" "s2_begin" "s2_savepoint" "s2_alter_pub_add_tbl" "s2_commit" 
"s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub"
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s2_create_pub" "s1_init" "s1_insert_tbl1" "s1_begin" 
"s1_insert_tbl1" "s2_set_streaming_mode" "s2_alter_pub_add_tbl" 
"s2_get_binary_stream_changes" "s1_insert_tbl1" "s1_commit" 
"s2_get_binary_stream_changes" "s2_drop_pub"
-- 
2.30.0.windows.2

Reply via email to