Identity sequences shouldn't be addressed directly by name in normal
use. Therefore, requiring them to be added directly to publications is
a faulty interface. I think they should be considered included in a
publication automatically when their owning table is. See attached
patch for a sketch. (It doesn't actually work quite yet, but it shows
the idea, I think.)
If we end up keeping the logical replication of sequences feature, I
think something like this should be added, too.From ef7699a6f5ebe5d76f2f7ce14975b515a97df925 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Wed, 6 Apr 2022 14:47:36 +0200
Subject: [PATCH v1] Automatically replicate identity sequences with their
tables
This automatically includes an identity sequence into a publication if
the owning table is part of the publication (directly or indirectly).
---
src/backend/catalog/pg_publication.c | 10 ++++++++++
src/backend/replication/pgoutput/pgoutput.c | 18 ++++++++++++++++++
src/test/subscription/t/030_sequences.pl | 18 +++++++++++++++++-
3 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/src/backend/catalog/pg_publication.c
b/src/backend/catalog/pg_publication.c
index 9fe3b18926..0f9bf1a28f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1392,6 +1392,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
{
List *relids,
*schemarelids;
+ ListCell *lc;
relids = GetPublicationRelations(publication->oid,
PUB_OBJTYPE_SEQUENCE,
@@ -1404,6 +1405,15 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS)
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
sequences = list_concat_unique_oid(relids,
schemarelids);
+
+ foreach(lc, GetPublicationRelations(publication->oid,
+
PUB_OBJTYPE_TABLE,
+
publication->pubviaroot ? PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF))
+ {
+ Oid relid = lfirst_oid(lc);
+
+ sequences = list_concat_unique_oid(sequences,
getOwnedSequences(relid));
+ }
}
funcctx->user_fctx = (void *) sequences;
diff --git a/src/backend/replication/pgoutput/pgoutput.c
b/src/backend/replication/pgoutput/pgoutput.c
index 9d33630464..1044458a47 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/tupconvert.h"
+#include "catalog/dependency.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_namespace.h"
@@ -2206,6 +2207,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
}
}
+ if (relation->rd_rel->relkind ==
RELKIND_SEQUENCE)
+ {
+ Oid tableId;
+ int32 colId;
+
+ if (sequenceIsOwned(relid,
DEPENDENCY_INTERNAL, &tableId, &colId))
+ {
+ if
(GetRelationPublications(tableId) ||
+
GetSchemaPublications(get_rel_namespace(tableId),
+
PUB_OBJTYPE_TABLE) ||
+ pub->alltables)
+ {
+ ancestor_published =
true;
+ }
+ }
+ }
+
if (list_member_oid(pubids, pub->oid) ||
list_member_oid(schemaPubids, pub->oid)
||
ancestor_published)
diff --git a/src/test/subscription/t/030_sequences.pl
b/src/test/subscription/t/030_sequences.pl
index 9ae3c03d7d..7b5b735bff 100644
--- a/src/test/subscription/t/030_sequences.pl
+++ b/src/test/subscription/t/030_sequences.pl
@@ -22,6 +22,7 @@
my $ddl = qq(
CREATE TABLE seq_test (v BIGINT);
CREATE SEQUENCE s;
+ CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
);
# Setup structure on the publisher
@@ -33,6 +34,7 @@
CREATE TABLE seq_test (v BIGINT);
CREATE SEQUENCE s;
CREATE SEQUENCE s2;
+ CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY);
);
$node_subscriber->safe_psql('postgres', $ddl);
@@ -45,8 +47,14 @@
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION seq_pub ADD SEQUENCE s");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tab_pub");
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tab_pub ADD TABLE identity_test");
+
$node_subscriber->safe_psql('postgres',
- "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr'
PUBLICATION seq_pub"
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr'
PUBLICATION seq_pub, tab_pub"
);
$node_publisher->wait_for_catchup('seq_sub');
@@ -62,6 +70,7 @@
'postgres', qq(
-- generate a number of values using the sequence
INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
+ INSERT INTO identity_test SELECT * FROM generate_series(1,100);
));
$node_publisher->wait_for_catchup('seq_sub');
@@ -75,6 +84,13 @@
is( $result, '132|0|t',
'initial test data replicated');
+is($node_subscriber->safe_psql('postgres', qq(SELECT max(v) FROM
identity_test)),
+ '100',
+ 'identity table replicated');
+
+is($node_subscriber->safe_psql('postgres', qq(SELECT * FROM
identity_test_v_seq)),
+ '100|0|t',
+ 'identity sequence advanced');
# advance the sequence in a rolled-back transaction - the rollback
# does not wait for the replication, so we could see any intermediate state
--
2.35.1