On Friday, August 9, 2024 7:21 PM Shlok Kyal <shlok.kyal....@gmail.com> wrote:
Hi, > > In the v7 patch, I am looping through the reorder buffer of the current > committed > transaction and storing all invalidation messages in a list. Then I am > distributing those invalidations. > But I found that for a transaction we already store all the invalidation > messages > (see [1]). So we don't need to loop through the reorder buffer and store the > invalidations. > > I have modified the patch accordingly and attached the same. I have tested this patch across various scenarios and did not find issues. I confirmed that changes are correctly replicated after adding the table or schema to the publication, and changes will not be replicated after removing the table or schema from the publication. This behavior is consistent in both streaming and non-streaming modes. Additionally, I verified that invalidations occurring within subtransactions are appropriately distributed. Please refer to the attached ISOLATION tests which tested the above cases. This also inspires me if it would be cheaper to write an ISOLATION test for this bug instead of building a real pub/sub cluster. But I am not against the current tests in the V8 patch as that can check the replicated data in a visible way. Best Regards, Hou zj
From 8f4e36c5fc65d4a88058467a73cbe423a5f0e91e Mon Sep 17 00:00:00 2001 From: Hou Zhijie <houzj.f...@cn.fujitsu.com> Date: Mon, 9 Sep 2024 19:56:18 +0800 Subject: [PATCH] test invalidation distribution --- contrib/test_decoding/Makefile | 2 +- .../expected/invalidation_distrubution.out | 173 ++++++++++++++++++ .../specs/invalidation_distrubution.spec | 56 ++++++ 3 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509a..eef7077067 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ - skip_snapshot_restore + skip_snapshot_restore invalidation_distrubution REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out new file mode 100644 index 0000000000..cdc871d31d --- /dev/null +++ b/contrib/test_decoding/expected/invalidation_distrubution.out @@ -0,0 +1,173 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_schema s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s2_alter_pub_add_tbl s1_begin s1_insert_tbl1 s2_alter_pub_drop_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_drop_tbl: ALTER PUBLICATION pub DROP TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s2_alter_pub_add_schema s1_begin s1_insert_tbl1 s2_alter_pub_drop_schema s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_drop_schema: ALTER PUBLICATION pub DROP TABLES IN SCHEMA public; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_begin s2_savepoint s2_alter_pub_add_tbl s2_commit s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_begin: BEGIN; +step s2_savepoint: SAVEPOINT s1; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s2_commit: COMMIT; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s2_create_pub s1_init s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_set_streaming_mode s2_alter_pub_add_tbl s2_get_binary_stream_changes s1_insert_tbl1 s1_commit s2_get_binary_stream_changes s2_drop_pub +step s2_create_pub: CREATE PUBLICATION pub; +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_set_streaming_mode: SET debug_logical_replication_streaming = immediate; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s2_get_binary_stream_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; +count +----- + 0 +(1 row) + +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_binary_stream_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec new file mode 100644 index 0000000000..0d3d328250 --- /dev/null +++ b/contrib/test_decoding/specs/invalidation_distrubution.spec @@ -0,0 +1,56 @@ +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1(val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); } +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2_begin" { BEGIN; } +step "s2_savepoint" { SAVEPOINT s1; } +step "s2_set_streaming_mode" { SET debug_logical_replication_streaming = immediate; } +step "s2_create_pub" { CREATE PUBLICATION pub; } +step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; } +step "s2_alter_pub_drop_tbl" { ALTER PUBLICATION pub DROP TABLE tbl1; } +step "s2_alter_pub_add_schema" { ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; } +step "s2_alter_pub_drop_schema" { ALTER PUBLICATION pub DROP TABLES IN SCHEMA public; } +step "s2_drop_pub" { DROP PUBLICATION pub; } + + +step "s2_get_binary_stream_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; } +step "s2_commit" { COMMIT; } + +step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; } + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_schema" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_tbl" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_drop_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_schema" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_drop_schema" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_begin" "s2_savepoint" "s2_alter_pub_add_tbl" "s2_commit" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s2_create_pub" "s1_init" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_set_streaming_mode" "s2_alter_pub_add_tbl" "s2_get_binary_stream_changes" "s1_insert_tbl1" "s1_commit" "s2_get_binary_stream_changes" "s2_drop_pub" -- 2.30.0.windows.2