Identity sequences shouldn't be addressed directly by name in normal use. Therefore, requiring them to be added directly to publications is a faulty interface. I think they should be considered included in a publication automatically when their owning table is. See attached patch for a sketch. (It doesn't actually work quite yet, but it shows the idea, I think.)

If we end up keeping the logical replication of sequences feature, I think something like this should be added, too.
From ef7699a6f5ebe5d76f2f7ce14975b515a97df925 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Wed, 6 Apr 2022 14:47:36 +0200
Subject: [PATCH v1] Automatically replicate identity sequences with their
 tables

This automatically includes an identity sequence into a publication if
the owning table is part of the publication (directly or indirectly).
---
 src/backend/catalog/pg_publication.c        | 10 ++++++++++
 src/backend/replication/pgoutput/pgoutput.c | 18 ++++++++++++++++++
 src/test/subscription/t/030_sequences.pl    | 18 +++++++++++++++++-
 3 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/src/backend/catalog/pg_publication.c 
b/src/backend/catalog/pg_publication.c
index 9fe3b18926..0f9bf1a28f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1392,6 +1392,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
                {
                        List       *relids,
                                           *schemarelids;
+                       ListCell   *lc;
 
                        relids = GetPublicationRelations(publication->oid,
                                                                                
         PUB_OBJTYPE_SEQUENCE,
@@ -1404,6 +1405,15 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
                                                                                
                                        PUBLICATION_PART_ROOT :
                                                                                
                                        PUBLICATION_PART_LEAF);
                        sequences = list_concat_unique_oid(relids, 
schemarelids);
+
+                       foreach(lc, GetPublicationRelations(publication->oid,
+                                                                               
                PUB_OBJTYPE_TABLE,
+                                                                               
                publication->pubviaroot ? PUBLICATION_PART_ROOT : 
PUBLICATION_PART_LEAF))
+                       {
+                               Oid                     relid = lfirst_oid(lc);
+
+                               sequences = list_concat_unique_oid(sequences, 
getOwnedSequences(relid));
+                       }
                }
 
                funcctx->user_fctx = (void *) sequences;
diff --git a/src/backend/replication/pgoutput/pgoutput.c 
b/src/backend/replication/pgoutput/pgoutput.c
index 9d33630464..1044458a47 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "catalog/dependency.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_namespace.h"
@@ -2206,6 +2207,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                                        }
                                }
 
+                               if (relation->rd_rel->relkind == 
RELKIND_SEQUENCE)
+                               {
+                                       Oid                     tableId;
+                                       int32           colId;
+
+                                       if (sequenceIsOwned(relid, 
DEPENDENCY_INTERNAL, &tableId, &colId))
+                                       {
+                                               if 
(GetRelationPublications(tableId) ||
+                                                       
GetSchemaPublications(get_rel_namespace(tableId),
+                                                                               
                  PUB_OBJTYPE_TABLE) ||
+                                                       pub->alltables)
+                                               {
+                                                       ancestor_published = 
true;
+                                               }
+                                       }
+                               }
+
                                if (list_member_oid(pubids, pub->oid) ||
                                        list_member_oid(schemaPubids, pub->oid) 
||
                                        ancestor_published)
diff --git a/src/test/subscription/t/030_sequences.pl 
b/src/test/subscription/t/030_sequences.pl
index 9ae3c03d7d..7b5b735bff 100644
--- a/src/test/subscription/t/030_sequences.pl
+++ b/src/test/subscription/t/030_sequences.pl
@@ -22,6 +22,7 @@
 my $ddl = qq(
        CREATE TABLE seq_test (v BIGINT);
        CREATE SEQUENCE s;
+       CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
 );
 
 # Setup structure on the publisher
@@ -33,6 +34,7 @@
        CREATE TABLE seq_test (v BIGINT);
        CREATE SEQUENCE s;
        CREATE SEQUENCE s2;
+       CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
 );
 
 $node_subscriber->safe_psql('postgres', $ddl);
@@ -45,8 +47,14 @@
 $node_publisher->safe_psql('postgres',
        "ALTER PUBLICATION seq_pub ADD SEQUENCE s");
 
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION tab_pub");
+
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION tab_pub ADD TABLE identity_test");
+
 $node_subscriber->safe_psql('postgres',
-       "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' 
PUBLICATION seq_pub"
+       "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' 
PUBLICATION seq_pub, tab_pub"
 );
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -62,6 +70,7 @@
        'postgres', qq(
        -- generate a number of values using the sequence
        INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
+       INSERT INTO identity_test SELECT * FROM generate_series(1,100);
 ));
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -75,6 +84,13 @@
 is( $result, '132|0|t',
        'initial test data replicated');
 
+is($node_subscriber->safe_psql('postgres', qq(SELECT max(v) FROM 
identity_test)),
+       '100',
+       'identity table replicated');
+
+is($node_subscriber->safe_psql('postgres', qq(SELECT * FROM 
identity_test_v_seq)),
+       '100|0|t',
+       'identity sequence advanced');
 
 # advance the sequence in a rolled-back transaction - the rollback
 # does not wait for the replication, so we could see any intermediate state
-- 
2.35.1

Reply via email to