Further review (based on 20220310 patch):

 doc/src/sgml/ref/create_publication.sgml       |   3 +

For the clauses added to the synopsis, descriptions should be added
below.  See attached patch for a start.

 src/backend/commands/sequence.c                |  79 ++

There is quite a bit of overlap between ResetSequence() and
ResetSequence2(), but I couldn't see a good way to combine them that
genuinely saves code and complexity.  So maybe it's ok.

Actually, ResetSequence2() is not really "reset", it's just "set".
Maybe pick a different function name.

 src/backend/commands/subscriptioncmds.c        | 272 +++++++

The code added in AlterSubscription_refresh() seems to be entirely
copy-and-paste from the tables case.  I think this could be combined
by concatenating the lists from fetch_table_list() and
fetch_sequence_list() and looping over it once.  The same also applies
to CreateSubscription(), although the code duplication is smaller
there.

This in turn means that fetch_table_list() and fetch_sequence_list()
can be combined, so that you don't actually need any extensive new
code in CreateSubscription() and AlterSubscription_refresh() for
sequences.  This could go on, you can combine more of the underlying
code, like pg_publication_tables and pg_publication_sequences and so
on.

 src/backend/replication/logical/proto.c        |  52 ++

The documentation of the added protocol message needs to be added to
the documentation.  See attached patch for a start.

The sequence message does not contain the sequence Oid, unlike the
relation message.  Would that be good to add?

 src/backend/replication/logical/worker.c       |  56 ++

Maybe the Asserts in apply_handle_sequence() should be elogs.  These
are checking what is sent over the network, so we don't want a
bad/evil peer able to trigger asserts.  And in non-assert builds these
conditions would be unchecked.

 src/backend/replication/pgoutput/pgoutput.c    |  82 +-

I find the the in get_rel_sync_entry() confusing.  You add a section for

if (!publish && is_sequence)

but then shouldn't the code below that be something like

if (!publish && !is_sequence)

 src/bin/pg_dump/t/002_pg_dump.pl               |  38 +-

This adds a new publication "pub4", but the tests already contain a
"pub4".  I'm not sure why this even works, but perhaps the new one
shold be "pub5", unless there is a deeper meaning.

 src/include/catalog/pg_publication_namespace.h |   3 +-

I don't like how the distinction between table and sequence is done
using a bool field.  That affects also the APIs in pg_publication.c
and publicationcmds.c especially.  There is a lot of unadorned "true"
and "false" being passed around that isn't very clear, and it all
appears to originate at this catalog.  I think we could use a char
field here that uses the relkind constants.  That would also make the
code in pg_publication.c etc. slightly clearer.


See attached patch for more small tweaks.

Your patch still contains a number of XXX and FIXME comments, which in my assessment are all more or less correct, so I didn't comment on those separately.

Other than that, this seems pretty good.

Earlier in the thread I commented on some aspects of the new grammar (e.g., do we need FOR ALL SEQUENCES?). I think this would be useful to review again after all the new logical replication patches are in. I don't want to hold up this patch for that at this point.
From bdded82050841d3b71308ce82110efd21d99ea53 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Sun, 13 Mar 2022 07:38:46 +0100
Subject: [PATCH] fixup! Add support for decoding sequences to built-in
 replication

---
 doc/src/sgml/protocol.sgml                | 119 ++++++++++++++++++++++
 doc/src/sgml/ref/alter_publication.sgml   |   2 +-
 doc/src/sgml/ref/create_publication.sgml  |  42 +++++---
 src/backend/catalog/pg_publication.c      |   8 +-
 src/backend/commands/subscriptioncmds.c   |   2 +-
 src/backend/parser/gram.y                 |  14 ---
 src/backend/replication/logical/worker.c  |   2 +-
 src/bin/pg_dump/pg_dump.c                 |   6 +-
 src/test/regress/expected/publication.out |   2 +-
 src/test/regress/sql/object_address.sql   |   1 +
 src/test/regress/sql/publication.sql      |   2 +-
 src/test/subscription/t/029_sequences.pl  |  14 +--
 12 files changed, 165 insertions(+), 49 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 9178c779ba..49c05e1866 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -7055,6 +7055,125 @@ <title>Logical Replication Message Formats</title>
 </listitem>
 </varlistentry>
 
+<varlistentry id="protocol-logicalrep-message-formats-Sequence">
+<term>
+Sequence
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('X')
+</term>
+<listitem>
+<para>
+                Identifies the message as a sequence message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32 (TransactionId)
+</term>
+<listitem>
+<para>
+               Xid of the transaction (only present for streamed transactions).
+               This field is available since protocol version 2.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8(0)
+</term>
+<listitem>
+<para>
+                Flags; currently unused.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64 (XLogRecPtr)
+</term>
+<listitem>
+<para>
+                The LSN of FIXME.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                Namespace (empty string for <literal>pg_catalog</literal>).
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                Relation name.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                1 if the sequence update is transactions, 0 otherwise.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                <structfield>last_value</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                <structfield>log_cnt</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                <structfield>is_called</structfield> value of the sequence.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 <varlistentry id="protocol-logicalrep-message-formats-Type">
 <term>
 Type
diff --git a/doc/src/sgml/ref/alter_publication.sgml 
b/doc/src/sgml/ref/alter_publication.sgml
index 36c9a5f438..5dacb732b6 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -31,7 +31,7 @@
 <phrase>where <replaceable class="parameter">publication_object</replaceable> 
is one of:</phrase>
 
     TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * 
] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
-    SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] 
[, ... ]
+    SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [, ... 
]
     ALL TABLES IN SCHEMA { <replaceable 
class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
     ALL SEQUENCES IN SCHEMA { <replaceable 
class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
diff --git a/doc/src/sgml/ref/create_publication.sgml 
b/doc/src/sgml/ref/create_publication.sgml
index f72318e97d..286529e749 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -66,6 +66,20 @@ <title>Parameters</title>
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>FOR SEQUENCE</literal></term>
+    <listitem>
+     <para>
+      Specifies a list of sequences to add to the publication.
+     </para>
+
+     <para>
+      Specifying a sequence that is part of a schema specified by <literal>FOR
+      ALL SEQUENCES IN SCHEMA</literal> is not supported.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>FOR TABLE</literal></term>
     <listitem>
@@ -111,26 +125,28 @@ <title>Parameters</title>
    </varlistentry>
 
    <varlistentry>
+    <term><literal>FOR ALL SEQUENCES</literal></term>
     <term><literal>FOR ALL TABLES</literal></term>
     <listitem>
      <para>
-      Marks the publication as one that replicates changes for all tables in
-      the database, including tables created in the future.
+      Marks the publication as one that replicates changes for all 
sequences/tables in
+      the database, including sequences/tables created in the future.
      </para>
     </listitem>
    </varlistentry>
 
    <varlistentry>
+    <term><literal>FOR ALL SEQUENCES IN SCHEMA</literal></term>
     <term><literal>FOR ALL TABLES IN SCHEMA</literal></term>
     <listitem>
      <para>
-      Marks the publication as one that replicates changes for all tables in
-      the specified list of schemas, including tables created in the future.
+      Marks the publication as one that replicates changes for all 
sequences/tables in
+      the specified list of schemas, including sequences/tables created in the 
future.
      </para>
 
      <para>
-      Specifying a schema along with a table which belongs to the specified
-      schema using <literal>FOR TABLE</literal> is not supported.
+      Specifying a schema along with a sequence/table which belongs to the 
specified
+      schema using <literal>FOR SEQUENCE</literal>/<literal>FOR 
TABLE</literal> is not supported.
      </para>
 
      <para>
@@ -205,10 +221,9 @@ <title>Parameters</title>
   <title>Notes</title>
 
   <para>
-   If <literal>FOR TABLE</literal>, <literal>FOR ALL TABLES</literal> or
-   <literal>FOR ALL TABLES IN SCHEMA</literal> are not specified, then the
-   publication starts out with an empty set of tables.  That is useful if
-   tables or schemas are to be added later.
+   If <literal>FOR TABLE</literal>, <literal>FOR SEQUENCE</literal>, etc. is
+   not specified, then the publication starts out with an empty set of tables
+   and sequences.  That is useful if objects are to be added later.
   </para>
 
   <para>
@@ -223,10 +238,9 @@ <title>Notes</title>
   </para>
 
   <para>
-   To add a table to a publication, the invoking user must have ownership
-   rights on the table.  The <command>FOR ALL TABLES</command> and
-   <command>FOR ALL TABLES IN SCHEMA</command> clauses require the invoking
-   user to be a superuser.
+   To add a table or sequence to a publication, the invoking user must have
+   ownership rights on the table or sequence.  The <command>FOR ALL
+   ...</command> clauses require the invoking user to be a superuser.
   </para>
 
   <para>
diff --git a/src/backend/catalog/pg_publication.c 
b/src/backend/catalog/pg_publication.c
index d866e8a9b2..8e26e0cee2 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -636,7 +636,7 @@ GetAllSequencesPublications(void)
        SysScanDesc scan;
        HeapTuple       tup;
 
-       /* Find all publications that are marked as for all tables. */
+       /* Find all publications that are marked as for all sequences. */
        rel = table_open(PublicationRelationId, AccessShareLock);
 
        ScanKeyInit(&scankey,
@@ -892,11 +892,7 @@ GetAllSchemaPublicationRelations(Oid pubid, bool sequences,
 }
 
 /*
- * Gets list of all relation published by FOR ALL TABLES publication(s).
- *
- * If the publication publishes partition changes via their respective root
- * partitioned tables, we must exclude partitions in favor of including the
- * root partitioned tables.
+ * Gets list of all relation published by FOR ALL SEQUENCES publication(s).
  */
 List *
 GetAllSequencesPublicationRelations(void)
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 5beb67e765..1c70c4369a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1859,7 +1859,7 @@ fetch_sequence_list(WalReceiverConn *wrconn, List 
*publications)
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
-                               (errmsg("could not receive list of replicated 
tables from the publisher: %s",
+                               (errmsg("could not receive list of replicated 
sequences from the publisher: %s",
                                                res->err)));
 
        /* Process tables. */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9097ac3fab..6ff0ddd62b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9705,13 +9705,6 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes 
OWNER TO RoleSpec
  *
  * CREATE PUBLICATION FOR pub_obj [, ...] [WITH options]
  *
- * pub_obj is one of:
- *
- *             TABLE table [, ...]
- *             SEQUENCE table [, ...]
- *             ALL TABLES IN SCHEMA schema [, ...]
- *             ALL SEQUENCES IN SCHEMA schema [, ...]
- *
  *****************************************************************************/
 
 CreatePublicationStmt:
@@ -9868,13 +9861,6 @@ pub_obj_list:    PublicationObjSpec
  *
  * ALTER PUBLICATION name SET pub_obj [, ...]
  *
- * pub_obj is one of:
- *
- *             TABLE table_name [, ...]
- *             SEQUENCE table_name [, ...]
- *             ALL TABLES IN SCHEMA schema_name [, ...]
- *             ALL SEQUENCES IN SCHEMA schema_name [, ...]
- *
  *****************************************************************************/
 
 AlterPublicationStmt:
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 860c31fa05..1282c15f92 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -142,9 +142,9 @@
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_tablespace.h"
+#include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
-#include "commands/sequence.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ef8c6e43c6..35a8fc7631 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3819,19 +3819,19 @@ getPublications(Archive *fout, int *numPublications)
                appendPQExpBuffer(query,
                                                  "SELECT p.tableoid, p.oid, 
p.pubname, "
                                                  "p.pubowner, "
-                                                 "p.puballtables, false AS 
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false 
AS p.pubsequence, p.pubviaroot "
+                                                 "p.puballtables, false AS 
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS 
pubsequence, p.pubviaroot "
                                                  "FROM pg_publication p");
        else if (fout->remoteVersion >= 110000)
                appendPQExpBuffer(query,
                                                  "SELECT p.tableoid, p.oid, 
p.pubname, "
                                                  "p.pubowner, "
-                                                 "p.puballtables, false AS 
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false 
AS p.pubsequence, false AS pubviaroot "
+                                                 "p.puballtables, false AS 
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS 
pubsequence, false AS pubviaroot "
                                                  "FROM pg_publication p");
        else
                appendPQExpBuffer(query,
                                                  "SELECT p.tableoid, p.oid, 
p.pubname, "
                                                  "p.pubowner, "
-                                                 "p.puballtables, false AS 
p.puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, 
false AS p.pubsequence, false AS pubviaroot "
+                                                 "p.puballtables, false AS 
puballsequences, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, 
false AS pubsequence, false AS pubviaroot "
                                                  "FROM pg_publication p");
 
        res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
diff --git a/src/test/regress/expected/publication.out 
b/src/test/regress/expected/publication.out
index 620fab87e6..92c50b13ec 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -262,7 +262,7 @@ Sequences from schemas:
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test;
 RESET client_min_messages;
--- fail - can't create publication with schema and table of the same schema
+-- fail - can't create publication with schema and sequence of the same schema
 CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA 
pub_test, SEQUENCE pub_test.testpub_seq1;
 ERROR:  cannot add relation "pub_test.testpub_seq1" to publication
 DETAIL:  Sequence's schema "pub_test" is already part of the publication or 
part of the specified schema list.
diff --git a/src/test/regress/sql/object_address.sql 
b/src/test/regress/sql/object_address.sql
index f90afad804..2f40156eb4 100644
--- a/src/test/regress/sql/object_address.sql
+++ b/src/test/regress/sql/object_address.sql
@@ -143,6 +143,7 @@ CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM 
addr_nsp.gentable;
 SELECT pg_get_object_address('publication', '{one,two}', '{}');
 SELECT pg_get_object_address('subscription', '{one}', '{}');
 SELECT pg_get_object_address('subscription', '{one,two}', '{}');
+
 -- test successful cases
 WITH objects (type, name, args) AS (VALUES
                                ('table', '{addr_nsp, gentable}'::text[], 
'{}'::text[]),
diff --git a/src/test/regress/sql/publication.sql 
b/src/test/regress/sql/publication.sql
index af665395e1..5043c4bbba 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -144,7 +144,7 @@ CREATE PUBLICATION testpub_forsequence FOR SEQUENCE 
testpub_seq0;
 SET client_min_messages = 'ERROR';
 CREATE PUBLICATION testpub_forschema FOR ALL SEQUENCES IN SCHEMA pub_test;
 RESET client_min_messages;
--- fail - can't create publication with schema and table of the same schema
+-- fail - can't create publication with schema and sequence of the same schema
 CREATE PUBLICATION testpub_for_seq_schema FOR ALL SEQUENCES IN SCHEMA 
pub_test, SEQUENCE pub_test.testpub_seq1;
 -- fail - can't add a sequence of the same schema to the schema publication
 ALTER PUBLICATION testpub_forschema ADD SEQUENCE pub_test.testpub_seq1;
diff --git a/src/test/subscription/t/029_sequences.pl 
b/src/test/subscription/t/029_sequences.pl
index cdd7f7f344..9ae3c03d7d 100644
--- a/src/test/subscription/t/029_sequences.pl
+++ b/src/test/subscription/t/029_sequences.pl
@@ -46,7 +46,7 @@
        "ALTER PUBLICATION seq_pub ADD SEQUENCE s");
 
 $node_subscriber->safe_psql('postgres',
-       "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' 
PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+       "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' 
PUBLICATION seq_pub"
 );
 
 $node_publisher->wait_for_catchup('seq_sub');
@@ -73,7 +73,7 @@
 ));
 
 is( $result, '132|0|t',
-       'check replicated sequence values on subscriber');
+       'initial test data replicated');
 
 
 # advance the sequence in a rolled-back transaction - the rollback
@@ -96,7 +96,7 @@
 ));
 
 is( $result, '231|0|t',
-       'check replicated sequence values on subscriber');
+       'advance sequence in rolled-back transaction');
 
 
 # create a new sequence and roll it back - should not be replicated, due to
@@ -119,7 +119,7 @@
 ));
 
 is( $result, '1|0|f',
-       'check replicated sequence values on subscriber');
+       'create new sequence and roll it back');
 
 
 # create a new sequence, advance it in a rolled-back transaction, but commit
@@ -150,7 +150,7 @@
 ));
 
 is( $result, '132|0|t',
-       'check replicated sequence values on subscriber');
+       'create sequence, advance it in rolled-back transaction, but commit the 
create');
 
 
 # advance the new sequence in a transaction, and roll it back - the rollback
@@ -173,7 +173,7 @@
 ));
 
 is( $result, '231|0|t',
-       'check replicated sequence values on subscriber');
+       'advance the new sequence in a transaction and roll it back');
 
 
 # advance the sequence in a subtransaction - the subtransaction gets rolled
@@ -196,7 +196,7 @@
 ));
 
 is( $result, '330|0|t',
-       'check replicated sequence values on subscriber');
+       'advance sequence in a subtransaction');
 
 
 done_testing();
-- 
2.35.1

Reply via email to