Dear Ajin, Hou, > - Snapshot construction
I understand the approach that we do not try to filter for all the workloads; do just best-effort. I played with your PoC and here are my comments. 1. Can you add tests for better understanding? I've tried for tes_decoding like attached, but it couldn't pass the regression test. Cases "stream.sql" and "twophase_stream.sql" were failed. 2. I think we can extend the skip mechanism to UPDATE/DELETE/MultiInsert/SpecConfirm. Regarding the TRUNCATE, I'm not sure we can handle hte TRUNCATE case because the we can't track RelFileLocator anymore. 3. Both ReorderBuffer and RelFileLocatorFilterCache have the same lifetime but RelFileLocatorFilterCache is provided as the global variable; it is quite strange. Can you somehow avoid this? I considered idea but could not because we do not have APIs to Unregister the relcacheCallback. 4. For output plugins which does not cache the catalog, it may not able to do filtering without the transaction. For them, the early filter may degrade the performance because it requires to open transactions for every changes. Is my understanding correct? Best regards, Hayato Kuroda FUJITSU LIMITED
From 1fc88208de13fae8d287d456b899d4ebe01558be Mon Sep 17 00:00:00 2001 From: Hayato Kuroda <kuroda.hay...@fujitsu.com> Date: Fri, 31 Jan 2025 21:45:31 +0900 Subject: [PATCH v20250131] Try to add tests --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/filter.out | 26 +++++++++++++++++++++++ contrib/test_decoding/meson.build | 1 + contrib/test_decoding/sql/filter.sql | 16 ++++++++++++++ contrib/test_decoding/test_decoding.c | 21 ++++++++++++++++++ 5 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/filter.out create mode 100644 contrib/test_decoding/sql/filter.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509a..a66ab2cbf8 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream filter ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ diff --git a/contrib/test_decoding/expected/filter.out b/contrib/test_decoding/expected/filter.out new file mode 100644 index 0000000000..a3b43f2772 --- /dev/null +++ b/contrib/test_decoding/expected/filter.out @@ -0,0 +1,26 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- Create two tables +CREATE TABLE test(id int); +CREATE TABLE test_skipped(id int); +-- Changes for XXX_skipped are skipped +BEGIN; +INSERT INTO test_skipped VALUES (1); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 54d65d3f30..9f1f98e6e2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -41,6 +41,7 @@ tests += { 'stats', 'twophase', 'twophase_stream', + 'filter', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/sql/filter.sql b/contrib/test_decoding/sql/filter.sql new file mode 100644 index 0000000000..08ec80eb03 --- /dev/null +++ b/contrib/test_decoding/sql/filter.sql @@ -0,0 +1,16 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- Create two tables +CREATE TABLE test(id int); +CREATE TABLE test_skipped(id int); + +-- Changes for XXX_skipped are skipped +BEGIN; +INSERT INTO test_skipped VALUES (1); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 0113b19636..789a4bc2aa 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -68,6 +68,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid, + ReorderBufferChangeType change_type, + bool in_txn, bool *cache_valid); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, @@ -133,6 +136,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_change_cb = pg_decode_filter_change; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->filter_prepare_cb = pg_decode_filter_prepare; @@ -467,6 +471,23 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static bool +pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid, + ReorderBufferChangeType change_type, bool in_txn, + bool *cache_valid) +{ + if (in_txn) + { + Relation relation = RelationIdGetRelation(relid); + char *relname = NameStr(relation->rd_rel->relname); + + if (strstr(relname, "_skipped") != NULL) + return true; + } + + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. -- 2.43.5