Dear Ajin, Hou,

> - Snapshot construction

I understand the approach that we do not try to filter for all the workloads; do
just best-effort.

I played with your PoC and here are my comments.

1.
Can you add tests for better understanding? I've tried for tes_decoding like 
attached,
but it couldn't pass the regression test. Cases "stream.sql" and 
"twophase_stream.sql"
were failed.

2.
I think we can extend the skip mechanism to 
UPDATE/DELETE/MultiInsert/SpecConfirm.
Regarding the TRUNCATE, I'm not sure we can handle hte TRUNCATE case because 
the we
can't track RelFileLocator anymore.

3.
Both ReorderBuffer and RelFileLocatorFilterCache have the same lifetime but
RelFileLocatorFilterCache is provided as the global variable; it is quite 
strange.
Can you somehow avoid this? I considered idea but could not because we do not 
have
APIs to Unregister the relcacheCallback.

4.
For output plugins which does not cache the catalog, it may not able to do 
filtering
without the transaction. For them, the early filter may degrade the performance
because it requires to open transactions for every changes.
Is my understanding correct?

Best regards,
Hayato Kuroda
FUJITSU LIMITED

From 1fc88208de13fae8d287d456b899d4ebe01558be Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Fri, 31 Jan 2025 21:45:31 +0900
Subject: [PATCH v20250131] Try to add tests

---
 contrib/test_decoding/Makefile            |  2 +-
 contrib/test_decoding/expected/filter.out | 26 +++++++++++++++++++++++
 contrib/test_decoding/meson.build         |  1 +
 contrib/test_decoding/sql/filter.sql      | 16 ++++++++++++++
 contrib/test_decoding/test_decoding.c     | 21 ++++++++++++++++++
 5 files changed, 65 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/filter.out
 create mode 100644 contrib/test_decoding/sql/filter.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a4ba1a509a..a66ab2cbf8 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding 
output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
        decoding_into_rel binary prepared replorigin time messages \
-       spill slot truncate stream stats twophase twophase_stream
+       spill slot truncate stream stats twophase twophase_stream filter
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
        oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
        twophase_snapshot slot_creation_error catalog_change_snapshot \
diff --git a/contrib/test_decoding/expected/filter.out 
b/contrib/test_decoding/expected/filter.out
new file mode 100644
index 0000000000..a3b43f2772
--- /dev/null
+++ b/contrib/test_decoding/expected/filter.out
@@ -0,0 +1,26 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Create two tables
+CREATE TABLE test(id int);
+CREATE TABLE test_skipped(id int);
+-- Changes for XXX_skipped are skipped
+BEGIN;
+INSERT INTO test_skipped VALUES (1);
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build 
b/contrib/test_decoding/meson.build
index 54d65d3f30..9f1f98e6e2 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -41,6 +41,7 @@ tests += {
       'stats',
       'twophase',
       'twophase_stream',
+      'filter',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/sql/filter.sql 
b/contrib/test_decoding/sql/filter.sql
new file mode 100644
index 0000000000..08ec80eb03
--- /dev/null
+++ b/contrib/test_decoding/sql/filter.sql
@@ -0,0 +1,16 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+
+-- Create two tables
+CREATE TABLE test(id int);
+CREATE TABLE test_skipped(id int);
+
+-- Changes for XXX_skipped are skipped
+BEGIN;
+INSERT INTO test_skipped VALUES (1);
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c 
b/contrib/test_decoding/test_decoding.c
index 0113b19636..789a4bc2aa 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -68,6 +68,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
                                                           ReorderBufferChange 
*change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
                                                         RepOriginId origin_id);
+static bool pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid,
+                                                                       
ReorderBufferChangeType change_type,
+                                                                       bool 
in_txn, bool *cache_valid);
 static void pg_decode_message(LogicalDecodingContext *ctx,
                                                          ReorderBufferTXN 
*txn, XLogRecPtr lsn,
                                                          bool transactional, 
const char *prefix,
@@ -133,6 +136,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->truncate_cb = pg_decode_truncate;
        cb->commit_cb = pg_decode_commit_txn;
        cb->filter_by_origin_cb = pg_decode_filter;
+       cb->filter_change_cb = pg_decode_filter_change;
        cb->shutdown_cb = pg_decode_shutdown;
        cb->message_cb = pg_decode_message;
        cb->filter_prepare_cb = pg_decode_filter_prepare;
@@ -467,6 +471,23 @@ pg_decode_filter(LogicalDecodingContext *ctx,
        return false;
 }
 
+static bool
+pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid,
+                                               ReorderBufferChangeType 
change_type, bool in_txn,
+                                               bool *cache_valid)
+{
+       if (in_txn)
+       {
+               Relation        relation = RelationIdGetRelation(relid);
+               char       *relname = NameStr(relation->rd_rel->relname);
+
+               if (strstr(relname, "_skipped") != NULL)
+                       return true;
+       }
+
+       return false;
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
-- 
2.43.5

Reply via email to