On 2/10/22 19:17, Tomas Vondra wrote:
> I've polished & pushed the first part adding sequence decoding
> infrastructure etc. Attached are the two remaining parts.
>
> I plan to wait a day or two and then push the test_decoding part. The
> last part (for built-in replication) will need more work and maybe
> rethinking the grammar etc.
>
I've pushed the second part, adding sequences to test_decoding.
Here's the remaining part, rebased, with a small tweak in the TAP test
to eliminate the issue with not waiting for sequence increments. I've
kept the tweak in a separate patch, so that we can throw it away easily
if we happen to resolve the issue.
regards
--
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From c456270469cdb6ad769455eb3d16aea6db2c02af Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Thu, 10 Feb 2022 15:18:59 +0100
Subject: [PATCH 1/2] Add support for decoding sequences to built-in
replication
---
doc/src/sgml/catalogs.sgml | 71 ++++
doc/src/sgml/ref/alter_publication.sgml | 24 +-
doc/src/sgml/ref/alter_subscription.sgml | 4 +-
src/backend/catalog/pg_publication.c | 149 ++++++++-
src/backend/catalog/system_views.sql | 10 +
src/backend/commands/publicationcmds.c | 350 +++++++++++++++++++-
src/backend/commands/sequence.c | 79 +++++
src/backend/commands/subscriptioncmds.c | 272 +++++++++++++++
src/backend/executor/execReplication.c | 2 +-
src/backend/nodes/copyfuncs.c | 1 +
src/backend/nodes/equalfuncs.c | 1 +
src/backend/parser/gram.y | 32 ++
src/backend/replication/logical/proto.c | 52 +++
src/backend/replication/logical/tablesync.c | 118 ++++++-
src/backend/replication/logical/worker.c | 60 ++++
src/backend/replication/pgoutput/pgoutput.c | 85 ++++-
src/backend/utils/cache/relcache.c | 4 +-
src/bin/psql/tab-complete.c | 14 +-
src/include/catalog/pg_proc.dat | 5 +
src/include/catalog/pg_publication.h | 14 +
src/include/commands/sequence.h | 1 +
src/include/nodes/parsenodes.h | 6 +
src/include/replication/logicalproto.h | 19 ++
src/include/replication/pgoutput.h | 1 +
src/test/regress/expected/rules.out | 8 +
src/test/subscription/t/028_sequences.pl | 196 +++++++++++
26 files changed, 1542 insertions(+), 36 deletions(-)
create mode 100644 src/test/subscription/t/028_sequences.pl
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 879d2dbce03..271dc03e5a2 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9540,6 +9540,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l
<entry>prepared transactions</entry>
</row>
+ <row>
+ <entry><link linkend="view-pg-publication-sequences"><structname>pg_publication_sequences</structname></link></entry>
+ <entry>publications and their associated sequences</entry>
+ </row>
+
<row>
<entry><link linkend="view-pg-publication-tables"><structname>pg_publication_tables</structname></link></entry>
<entry>publications and their associated tables</entry>
@@ -11375,6 +11380,72 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
</sect1>
+ <sect1 id="view-pg-publication-sequences">
+ <title><structname>pg_publication_sequences</structname></title>
+
+ <indexterm zone="view-pg-publication-sequences">
+ <primary>pg_publication_sequences</primary>
+ </indexterm>
+
+ <para>
+ The view <structname>pg_publication_sequences</structname> provides
+ information about the mapping between publications and the sequences they
+ contain. Unlike the underlying catalog
+ <link linkend="catalog-pg-publication-rel"><structname>pg_publication_rel</structname></link>,
+ this view expands
+ publications defined as <literal>FOR ALL SEQUENCES</literal>, so for such
+ publications there will be a row for each eligible sequence.
+ </para>
+
+ <table>
+ <title><structname>pg_publication_sequences</structname> Columns</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>pubname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-publication"><structname>pg_publication</structname></link>.<structfield>pubname</structfield>)
+ </para>
+ <para>
+ Name of publication
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>schemaname</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-namespace"><structname>pg_namespace</structname></link>.<structfield>nspname</structfield>)
+ </para>
+ <para>
+ Name of schema containing sequence
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>sequencename</structfield> <type>name</type>
+ (references <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>relname</structfield>)
+ </para>
+ <para>
+ Name of sequence
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+ </sect1>
+
<sect1 id="view-pg-publication-tables">
<title><structname>pg_publication_tables</structname></title>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index 7c7c27bf7ce..9da8274ae2c 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -31,7 +31,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
<phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+ SEQUENCE <replaceable class="parameter">sequence_name</replaceable> [ * ] [, ... ]
ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
+ ALL SEQUENCES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
</synopsis>
</refsynopsisdiv>
@@ -56,7 +58,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</para>
<para>
- The fourth variant of this command listed in the synopsis can change
+ The next three variants change which sequences are part of the publication.
+ The <literal>SET SEQUENCE</literal> clause will replace the list of sequences
+ in the publication with the specified one. The <literal>ADD SEQUENCE</literal>
+ and <literal>DROP SEQUENCE</literal> clauses will add and remove one or more
+ sequences from the publication. Note that adding sequences to a publication
+ that is already subscribed to will require a <literal>ALTER SUBSCRIPTION
+ ... REFRESH PUBLICATION</literal> action on the subscribing side in order
+ to become effective.
+ </para>
+
+ <para>
+ The seventh variant of this command listed in the synopsis can change
all of the publication properties specified in
<xref linkend="sql-createpublication"/>. Properties not mentioned in the
command retain their previous settings.
@@ -123,6 +136,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><replaceable class="parameter">sequence_name</replaceable></term>
+ <listitem>
+ <para>
+ Name of an existing sequence.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term>
<listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0b027cc3462..8f28cf03f40 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -147,7 +147,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<listitem>
<para>
Fetch missing table information from publisher. This will start
- replication of tables that were added to the subscribed-to publications
+ replication of tables and sequences that were added to the subscribed-to publications
since <command>CREATE SUBSCRIPTION</command> or
the last invocation of <command>REFRESH PUBLICATION</command>.
</para>
@@ -164,7 +164,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Specifies whether to copy pre-existing data in the publications
that are being subscribed to when the replication starts.
The default is <literal>true</literal>. (Previously-subscribed
- tables are not copied.)
+ tables and sequences are not copied.)
</para>
</listitem>
</varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index e14ca2f5630..1a9e05ba98b 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -54,7 +54,8 @@ check_publication_add_relation(Relation targetrel)
{
/* Must be a regular or partitioned table */
if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
- RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
+ RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE &&
+ RelationGetForm(targetrel)->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot add relation \"%s\" to publication",
@@ -131,7 +132,8 @@ static bool
is_publishable_class(Oid relid, Form_pg_class reltuple)
{
return (reltuple->relkind == RELKIND_RELATION ||
- reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
+ reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
+ reltuple->relkind == RELKIND_SEQUENCE) &&
!IsCatalogRelationOid(relid) &&
reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
relid >= FirstNormalObjectId;
@@ -503,6 +505,11 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Form_pg_publication_rel pubrel;
pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ /* skip sequences here */
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ continue;
+
result = GetPubPartitionOptionRelations(result, pub_partopt,
pubrel->prrelid);
}
@@ -517,6 +524,49 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of relation oids for a publication (sequences only).
+ *
+ * This should only be used for normal publications, the FOR ALL TABLES
+ * should use GetAllSequencesPublicationRelations().
+ */
+List *
+GetPublicationSequenceRelations(Oid pubid)
+{
+ List *result;
+ Relation pubrelsrel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ /* Find all publications associated with the relation. */
+ pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_publication_rel_prpubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(pubid));
+
+ scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId,
+ true, NULL, 1, &scankey);
+
+ result = NIL;
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_publication_rel pubrel;
+
+ pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
+
+ if (get_rel_relkind(pubrel->prrelid) == RELKIND_SEQUENCE)
+ result = lappend_oid(result, pubrel->prrelid);
+ }
+
+ systable_endscan(scan);
+ table_close(pubrelsrel, AccessShareLock);
+
+ return result;
+}
+
/*
* Gets list of publication oids for publications marked as FOR ALL TABLES.
*/
@@ -762,6 +812,46 @@ GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
return result;
}
+/*
+ * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
+ */
+List *
+GetAllSequencesPublicationRelations(void)
+{
+ Relation classRel;
+ ScanKeyData key[1];
+ TableScanDesc scan;
+ HeapTuple tuple;
+ List *result = NIL;
+
+ classRel = table_open(RelationRelationId, AccessShareLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_class_relkind,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(RELKIND_SEQUENCE));
+
+ scan = table_beginscan_catalog(classRel, 1, key);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+ Oid relid = relForm->oid;
+
+ if (is_publishable_class(relid, relForm))
+ result = lappend_oid(result, relid);
+ }
+
+ table_endscan(scan);
+
+ table_close(classRel, AccessShareLock);
+ return result;
+}
+
/*
* Get publication using oid
*
@@ -784,10 +874,12 @@ GetPublication(Oid pubid)
pub->oid = pubid;
pub->name = pstrdup(NameStr(pubform->pubname));
pub->alltables = pubform->puballtables;
+ pub->allsequences = pubform->puballsequences;
pub->pubactions.pubinsert = pubform->pubinsert;
pub->pubactions.pubupdate = pubform->pubupdate;
pub->pubactions.pubdelete = pubform->pubdelete;
pub->pubactions.pubtruncate = pubform->pubtruncate;
+ pub->pubactions.pubsequence = pubform->pubsequence;
pub->pubviaroot = pubform->pubviaroot;
ReleaseSysCache(tup);
@@ -937,3 +1029,56 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+
+/*
+ * Returns Oids of sequences in a publication.
+ */
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ List *sequences;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /* switch to memory context appropriate for multiple function calls */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ publication = GetPublicationByName(pubname, false);
+
+ /*
+ * Publications support partitioned tables, although all changes are
+ * replicated using leaf partition identity and schema, so we only
+ * need those.
+ */
+ if (publication->allsequences)
+ sequences = GetAllSequencesPublicationRelations();
+ else
+ sequences = GetPublicationSequenceRelations(publication->oid);
+
+ funcctx->user_fctx = (void *) sequences;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /* stuff done on every call of the function */
+ funcctx = SRF_PERCALL_SETUP();
+ sequences = (List *) funcctx->user_fctx;
+
+ if (funcctx->call_cntr < list_length(sequences))
+ {
+ Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
+
+ SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
+ }
+
+ SRF_RETURN_DONE(funcctx);
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 3cb69b1f87b..b5cc33aca34 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -374,6 +374,16 @@ CREATE VIEW pg_publication_tables AS
pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE C.oid = GPT.relid;
+CREATE VIEW pg_publication_sequences AS
+ SELECT
+ P.pubname AS pubname,
+ N.nspname AS schemaname,
+ C.relname AS sequencename
+ FROM pg_publication P,
+ LATERAL pg_get_publication_sequences(P.pubname) GPT,
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+ WHERE C.oid = GPT.relid;
+
CREATE VIEW pg_locks AS
SELECT * FROM pg_lock_status() AS L;
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 0e4bb97fb73..3bc2e8ccb66 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -16,6 +16,7 @@
#include "access/genam.h"
#include "access/htup_details.h"
+#include "access/relation.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
@@ -59,6 +60,12 @@ static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
AlterPublicationStmt *stmt);
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
+static List *OpenSequenceList(List *sequences);
+static void CloseSequenceList(List *rels);
+static void PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt);
+static void PublicationDropSequences(Oid pubid, List *rels, bool missing_ok);
+
static void
parse_publication_options(ParseState *pstate,
List *options,
@@ -77,6 +84,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = true;
pubactions->pubdelete = true;
pubactions->pubtruncate = true;
+ pubactions->pubsequence = true;
*publish_via_partition_root = false;
/* Parse options */
@@ -101,6 +109,7 @@ parse_publication_options(ParseState *pstate,
pubactions->pubupdate = false;
pubactions->pubdelete = false;
pubactions->pubtruncate = false;
+ pubactions->pubsequence = false;
*publish_given = true;
publish = defGetString(defel);
@@ -123,6 +132,8 @@ parse_publication_options(ParseState *pstate,
pubactions->pubdelete = true;
else if (strcmp(publish_opt, "truncate") == 0)
pubactions->pubtruncate = true;
+ else if (strcmp(publish_opt, "sequence") == 0)
+ pubactions->pubsequence = true;
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -149,7 +160,9 @@ parse_publication_options(ParseState *pstate,
*/
static void
ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
- List **rels, List **schemas)
+ List **tables, List **sequences,
+ List **tables_schemas, List **sequences_schemas,
+ List **schemas)
{
ListCell *cell;
PublicationObjSpec *pubobj;
@@ -167,12 +180,23 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
switch (pubobj->pubobjtype)
{
case PUBLICATIONOBJ_TABLE:
- *rels = lappend(*rels, pubobj->pubtable);
+ *tables = lappend(*tables, pubobj->pubtable);
+ break;
+ case PUBLICATIONOBJ_SEQUENCE:
+ *sequences = lappend(*sequences, pubobj->pubtable);
break;
case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
schemaid = get_namespace_oid(pubobj->name, false);
/* Filter out duplicates if user specifies "sch1, sch1" */
+ *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid);
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ case PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA:
+ schemaid = get_namespace_oid(pubobj->name, false);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid);
*schemas = list_append_unique_oid(*schemas, schemaid);
break;
case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
@@ -186,6 +210,21 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
list_free(search_path);
/* Filter out duplicates if user specifies "sch1, sch1" */
+ *tables_schemas = list_append_unique_oid(*tables_schemas, schemaid);
+ *schemas = list_append_unique_oid(*schemas, schemaid);
+ break;
+ case PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA:
+ search_path = fetch_search_path(false);
+ if (search_path == NIL) /* nothing valid in search_path? */
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_SCHEMA),
+ errmsg("no schema has been selected for CURRENT_SCHEMA"));
+
+ schemaid = linitial_oid(search_path);
+ list_free(search_path);
+
+ /* Filter out duplicates if user specifies "sch1, sch1" */
+ *sequences_schemas = list_append_unique_oid(*sequences_schemas, schemaid);
*schemas = list_append_unique_oid(*schemas, schemaid);
break;
default:
@@ -251,7 +290,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool publish_via_partition_root_given;
bool publish_via_partition_root;
AclResult aclresult;
- List *relations = NIL;
+ List *tables = NIL;
+ List *sequences = NIL;
+ List *tables_schemaidlist = NIL;
+ List *sequences_schemaidlist = NIL;
List *schemaidlist = NIL;
/* must have CREATE privilege on database */
@@ -306,6 +348,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
BoolGetDatum(pubactions.pubdelete);
values[Anum_pg_publication_pubtruncate - 1] =
BoolGetDatum(pubactions.pubtruncate);
+ values[Anum_pg_publication_pubsequence - 1] =
+ BoolGetDatum(pubactions.pubsequence);
values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
@@ -330,26 +374,40 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
}
else
{
- ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate,
+ &tables, &sequences,
+ &tables_schemaidlist,
+ &sequences_schemaidlist,
&schemaidlist);
/* FOR ALL TABLES IN SCHEMA requires superuser */
- if (list_length(schemaidlist) > 0 && !superuser())
+ if (list_length(tables_schemaidlist) > 0 && !superuser())
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
- if (list_length(relations) > 0)
+ if (list_length(tables) > 0)
{
List *rels;
- rels = OpenTableList(relations);
- CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ rels = OpenTableList(tables);
+ CheckObjSchemaNotAlreadyInPublication(rels, tables_schemaidlist,
PUBLICATIONOBJ_TABLE);
PublicationAddTables(puboid, rels, true, NULL);
CloseTableList(rels);
}
+ if (list_length(sequences) > 0)
+ {
+ List *rels;
+
+ rels = OpenSequenceList(sequences);
+ CheckObjSchemaNotAlreadyInPublication(rels, sequences_schemaidlist,
+ PUBLICATIONOBJ_SEQUENCE);
+ PublicationAddTables(puboid, rels, true, NULL);
+ CloseSequenceList(rels);
+ }
+
if (list_length(schemaidlist) > 0)
{
/*
@@ -653,12 +711,13 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
*/
static void
CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
- List *tables, List *schemaidlist)
+ List *tables, List *tables_schemaidlist,
+ List *sequences, List *sequences_schemaidlist)
{
Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
- schemaidlist && !superuser())
+ (tables_schemaidlist || sequences_schemaidlist) && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to add or set schemas")));
@@ -667,13 +726,24 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
* Check that user is allowed to manipulate the publication tables in
* schema
*/
- if (schemaidlist && pubform->puballtables)
+ if (tables_schemaidlist && pubform->puballtables)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
+ /*
+ * Check that user is allowed to manipulate the publication sequences in
+ * schema
+ */
+ if (sequences_schemaidlist && pubform->puballsequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+ NameStr(pubform->pubname)),
+ errdetail("Sequences from schema cannot be added to, dropped from, or set on FOR ALL SEQUENCES publications.")));
+
/* Check that user is allowed to manipulate the publication tables. */
if (tables && pubform->puballtables)
ereport(ERROR,
@@ -681,6 +751,108 @@ CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
errmsg("publication \"%s\" is defined as FOR ALL TABLES",
NameStr(pubform->pubname)),
errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+ /* Check that user is allowed to manipulate the publication tables. */
+ if (sequences && pubform->puballsequences)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
+ NameStr(pubform->pubname)),
+ errdetail("Sequences cannot be added to or dropped from FOR ALL SEQUENCES publications.")));
+}
+
+/*
+ * Add or remove sequence to/from publication.
+ */
+static void
+AlterPublicationSequences(AlterPublicationStmt *stmt, HeapTuple tup,
+ List *sequences, List *schemaidlist)
+{
+ List *rels = NIL;
+ Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
+ Oid pubid = pubform->oid;
+
+ /*
+ * It is quite possible that for the SET case user has not specified any
+ * tables in which case we need to remove all the existing tables.
+ */
+ if (!sequences && stmt->action != AP_SetObjects)
+ return;
+
+ rels = OpenSequenceList(sequences);
+
+ if (stmt->action == AP_AddObjects)
+ {
+ List *schemas = NIL;
+
+ /*
+ * Check if the relation is member of the existing schema in the
+ * publication or member of the schema list specified.
+ */
+ schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
+ CheckObjSchemaNotAlreadyInPublication(rels, schemas,
+ PUBLICATIONOBJ_SEQUENCE);
+ PublicationAddSequences(pubid, rels, false, stmt);
+ }
+ else if (stmt->action == AP_DropObjects)
+ PublicationDropSequences(pubid, rels, false);
+ else /* DEFELEM_SET */
+ {
+ List *oldrelids = GetPublicationRelations(pubid,
+ PUBLICATION_PART_ROOT);
+ List *delrels = NIL;
+ ListCell *oldlc;
+
+ CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
+ PUBLICATIONOBJ_SEQUENCE);
+
+ /* Calculate which relations to drop. */
+ foreach(oldlc, oldrelids)
+ {
+ Oid oldrelid = lfirst_oid(oldlc);
+ ListCell *newlc;
+ bool found = false;
+
+ foreach(newlc, rels)
+ {
+ PublicationRelInfo *newpubrel;
+
+ newpubrel = (PublicationRelInfo *) lfirst(newlc);
+ if (RelationGetRelid(newpubrel->relation) == oldrelid)
+ {
+ found = true;
+ break;
+ }
+ }
+ /* Not yet in the list, open it and add to the list */
+ if (!found)
+ {
+ Relation oldrel;
+ PublicationRelInfo *pubrel;
+
+ /* Wrap relation into PublicationRelInfo */
+ oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
+
+ pubrel = palloc(sizeof(PublicationRelInfo));
+ pubrel->relation = oldrel;
+
+ delrels = lappend(delrels, pubrel);
+ }
+ }
+
+ /* And drop them. */
+ PublicationDropSequences(pubid, delrels, true);
+
+ /*
+ * Don't bother calculating the difference for adding, we'll catch and
+ * skip existing ones when doing catalog update.
+ */
+ PublicationAddSequences(pubid, rels, true, stmt);
+
+ CloseSequenceList(delrels);
+ }
+
+ CloseSequenceList(rels);
}
/*
@@ -718,13 +890,21 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
AlterPublicationOptions(pstate, stmt, rel, tup);
else
{
- List *relations = NIL;
+ List *tables = NIL;
+ List *sequences = NIL;
+ List *tables_schemaidlist = NIL;
+ List *sequences_schemaidlist = NIL;
List *schemaidlist = NIL;
- ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
+ ObjectsInPublicationToOids(stmt->pubobjects, pstate,
+ &tables, &sequences,
+ &tables_schemaidlist,
+ &sequences_schemaidlist,
&schemaidlist);
- CheckAlterPublication(stmt, tup, relations, schemaidlist);
+ CheckAlterPublication(stmt, tup,
+ tables, tables_schemaidlist,
+ sequences, sequences_schemaidlist);
/*
* Lock the publication so nobody else can do anything with it. This
@@ -749,7 +929,9 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
errmsg("publication \"%s\" does not exist",
stmt->pubname));
- AlterPublicationTables(stmt, tup, relations, schemaidlist);
+ AlterPublicationTables(stmt, tup, tables, tables_schemaidlist);
+ AlterPublicationSequences(stmt, tup, sequences, sequences_schemaidlist);
+
AlterPublicationSchemas(stmt, tup, schemaidlist);
}
@@ -1157,6 +1339,144 @@ PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
}
}
+/*
+ * Open relations specified by a PublicationTable list.
+ * In the returned list of PublicationRelInfo, tables are locked
+ * in ShareUpdateExclusiveLock mode in order to add them to a publication.
+ */
+static List *
+OpenSequenceList(List *sequences)
+{
+ List *relids = NIL;
+ List *rels = NIL;
+ ListCell *lc;
+
+ /*
+ * Open, share-lock, and check all the explicitly-specified relations
+ */
+ foreach(lc, sequences)
+ {
+ PublicationTable *s = lfirst_node(PublicationTable, lc);
+ Relation rel;
+ Oid myrelid;
+ PublicationRelInfo *pub_rel;
+
+ /* Allow query cancel in case this takes a long time */
+ CHECK_FOR_INTERRUPTS();
+
+ rel = table_openrv(s->relation, ShareUpdateExclusiveLock);
+ myrelid = RelationGetRelid(rel);
+
+ /*
+ * Filter out duplicates if user specifies "foo, foo".
+ *
+ * Note that this algorithm is known to not be very efficient (O(N^2))
+ * but given that it only works on list of tables given to us by user
+ * it's deemed acceptable.
+ */
+ if (list_member_oid(relids, myrelid))
+ {
+ table_close(rel, ShareUpdateExclusiveLock);
+ continue;
+ }
+
+ pub_rel = palloc(sizeof(PublicationRelInfo));
+ pub_rel->relation = rel;
+ rels = lappend(rels, pub_rel);
+ relids = lappend_oid(relids, myrelid);
+ }
+
+ list_free(relids);
+
+ return rels;
+}
+
+/*
+ * Close all relations in the list.
+ */
+static void
+CloseSequenceList(List *rels)
+{
+ ListCell *lc;
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel;
+
+ pub_rel = (PublicationRelInfo *) lfirst(lc);
+ table_close(pub_rel->relation, NoLock);
+ }
+}
+
+/*
+ * Add listed tables to the publication.
+ */
+static void
+PublicationAddSequences(Oid pubid, List *rels, bool if_not_exists,
+ AlterPublicationStmt *stmt)
+{
+ ListCell *lc;
+
+ Assert(!stmt || !stmt->for_all_sequences);
+
+ foreach(lc, rels)
+ {
+ PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
+ Relation rel = pub_rel->relation;
+ ObjectAddress obj;
+
+ /* Must be owner of the sequence or superuser. */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
+ RelationGetRelationName(rel));
+
+ obj = publication_add_relation(pubid, pub_rel, if_not_exists);
+ if (stmt)
+ {
+ EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+ (Node *) stmt);
+
+ InvokeObjectPostCreateHook(PublicationRelRelationId,
+ obj.objectId, 0);
+ }
+ }
+}
+
+/*
+ * Remove listed sequences from the publication.
+ */
+static void
+PublicationDropSequences(Oid pubid, List *rels, bool missing_ok)
+{
+ ObjectAddress obj;
+ ListCell *lc;
+ Oid prid;
+
+ foreach(lc, rels)
+ {
+ Relation rel = (Relation) lfirst(lc);
+ Oid relid = RelationGetRelid(rel);
+
+ prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
+ ObjectIdGetDatum(relid),
+ ObjectIdGetDatum(pubid));
+ if (!OidIsValid(prid))
+ {
+ if (missing_ok)
+ continue;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("relation \"%s\" is not part of the publication",
+ RelationGetRelationName(rel))));
+ }
+
+ ObjectAddressSet(obj, PublicationRelRelationId, prid);
+ performDeletion(&obj, DROP_CASCADE, 0);
+ }
+}
+
+
/*
* Internal workhorse for changing a publication owner
*/
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index ab592ce2f15..fe4f21ec438 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -336,6 +336,85 @@ ResetSequence(Oid seq_relid)
relation_close(seq_rel, NoLock);
}
+/*
+ * Reset a sequence to its initial value.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction. Caller is also
+ * responsible for permissions checking.
+ */
+void
+ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ Relation seq_rel;
+ SeqTable elm;
+ Form_pg_sequence_data seq;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ HeapTuple tuple;
+
+ /*
+ * Read the old sequence. This does a bit more work than really
+ * necessary, but it's simple, and we do want to double-check that it's
+ * indeed a sequence.
+ */
+ init_sequence(seq_relid, &elm, &seq_rel);
+ (void) read_seq_tuple(seq_rel, &buf, &seqdatatuple);
+
+ /*
+ * Copy the existing sequence tuple.
+ */
+ tuple = heap_copytuple(&seqdatatuple);
+
+ /* Now we're done with the old page */
+ UnlockReleaseBuffer(buf);
+
+ /*
+ * Modify the copied tuple to execute the restart (compare the RESTART
+ * action in AlterSequence)
+ */
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ /*
+ * Create a new storage file for the sequence.
+ */
+ RelationSetNewRelfilenode(seq_rel, seq_rel->rd_rel->relpersistence);
+
+ /*
+ * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+ * unfrozen XIDs. Same with relminmxid, since a sequence will never
+ * contain multixacts.
+ */
+ Assert(seq_rel->rd_rel->relfrozenxid == InvalidTransactionId);
+ Assert(seq_rel->rd_rel->relminmxid == InvalidMultiXactId);
+
+ /*
+ * Insert the modified tuple into the new storage file.
+ *
+ * XXX Maybe this should also use created=true, just like the other places
+ * calling fill_seq_with_data. That's probably needed for correct cascading
+ * replication.
+ *
+ * XXX That'd mean all fill_seq_with_data callers use created=true, making
+ * the parameter unnecessary.
+ */
+ fill_seq_with_data(seq_rel, tuple);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seq_rel, NoLock);
+}
+
/*
* Initialize a sequence's relation with the specified tuple as content
*/
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d246..5beb67e7652 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -85,6 +85,7 @@ typedef struct SubOpts
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -496,6 +497,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
char *err;
WalReceiverConn *wrconn;
List *tables;
+ List *sequences;
ListCell *lc;
char table_state;
@@ -534,6 +536,26 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
InvalidXLogRecPtr);
}
+ /*
+ * Get the sequence list from publisher and build local sequence
+ * status info.
+ */
+ sequences = fetch_sequence_list(wrconn, publications);
+ foreach(lc, sequences)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ AddSubscriptionRelState(subid, relid, table_state,
+ InvalidXLogRecPtr);
+ }
+
/*
* If requested, create permanent slot for the subscription. We
* won't use the initial snapshot for anything, so no need to
@@ -706,6 +728,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
{
Oid relid = subrel_local_oids[off];
+ /* XXX ignore sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) == RELKIND_SEQUENCE)
+ continue;
+
if (!bsearch(&relid, pubrel_local_oids,
list_length(pubrel_names), sizeof(Oid), oid_cmp))
{
@@ -797,6 +823,183 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
}
}
+
+ /*
+ * XXX now do the same thing for sequences, maybe before the preceding
+ * block, or earlier?
+ */
+
+ /* Get the table list from publisher. */
+ pubrel_names = fetch_sequence_list(wrconn, sub->publications);
+
+ /* Get local table list. */
+ subrel_states = GetSubscriptionRelations(sub->oid);
+
+ /*
+ * Build qsorted array of local table oids for faster lookup. This can
+ * potentially contain all tables in the database so speed of lookup
+ * is important.
+ */
+ subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ off = 0;
+ foreach(lc, subrel_states)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ subrel_local_oids[off++] = relstate->relid;
+ }
+ qsort(subrel_local_oids, list_length(subrel_states),
+ sizeof(Oid), oid_cmp);
+
+ /*
+ * Rels that we want to remove from subscription and drop any slots
+ * and origins corresponding to them.
+ */
+ sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+
+ /*
+ * Walk over the remote tables and try to match them to locally known
+ * tables. If the table is not known locally create a new state for
+ * it.
+ *
+ * Also builds array of local oids of remote tables for the next step.
+ */
+ off = 0;
+ pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+ foreach(lc, pubrel_names)
+ {
+ RangeVar *rv = (RangeVar *) lfirst(lc);
+ Oid relid;
+
+ relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+ /* Check for supported relkind. */
+ CheckSubscriptionRelkind(get_rel_relkind(relid),
+ rv->schemaname, rv->relname);
+
+ pubrel_local_oids[off++] = relid;
+
+ if (!bsearch(&relid, subrel_local_oids,
+ list_length(subrel_states), sizeof(Oid), oid_cmp))
+ {
+ AddSubscriptionRelState(sub->oid, relid,
+ copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+ InvalidXLogRecPtr);
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
+ rv->schemaname, rv->relname, sub->name)));
+ }
+ }
+
+ /*
+ * Next remove state for tables we should not care about anymore using
+ * the data we collected above
+ */
+ qsort(pubrel_local_oids, list_length(pubrel_names),
+ sizeof(Oid), oid_cmp);
+
+ remove_rel_len = 0;
+ for (off = 0; off < list_length(subrel_states); off++)
+ {
+ Oid relid = subrel_local_oids[off];
+
+ /* XXX ignore non-sequences - maybe do this in GetSubscriptionRelations? */
+ if (get_rel_relkind(relid) != RELKIND_SEQUENCE)
+ continue;
+
+ if (!bsearch(&relid, pubrel_local_oids,
+ list_length(pubrel_names), sizeof(Oid), oid_cmp))
+ {
+ char state;
+ XLogRecPtr statelsn;
+
+ /*
+ * Lock pg_subscription_rel with AccessExclusiveLock to
+ * prevent any race conditions with the apply worker
+ * re-launching workers at the same time this code is trying
+ * to remove those tables.
+ *
+ * Even if new worker for this particular rel is restarted it
+ * won't be able to make any progress as we hold exclusive
+ * lock on subscription_rel till the transaction end. It will
+ * simply exit as there is no corresponding rel entry.
+ *
+ * This locking also ensures that the state of rels won't
+ * change till we are done with this refresh operation.
+ */
+ if (!rel)
+ rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
+ /* Last known rel state. */
+ state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
+
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;
+
+ RemoveSubscriptionRel(sub->oid, relid);
+
+ logicalrep_worker_stop(sub->oid, relid);
+
+ /*
+ * For READY state, we would have already dropped the
+ * tablesync origin.
+ */
+ if (state != SUBREL_STATE_READY)
+ {
+ char originname[NAMEDATALEN];
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * It is possible that the origin is not yet created for
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
+ * concurrently try to drop the origin and by this time
+ * the origin might be already removed. For these reasons,
+ * passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(sub->oid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
+
+ ereport(DEBUG1,
+ (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
+ get_namespace_name(get_rel_namespace(relid)),
+ get_rel_name(relid),
+ sub->name)));
+ }
+ }
+
+ /*
+ * Drop the tablesync slots associated with removed tables. This has
+ * to be at the end because otherwise if there is an error while doing
+ * the database operations we won't be able to rollback dropped slots.
+ */
+ for (off = 0; off < remove_rel_len; off++)
+ {
+ if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ {
+ char syncslotname[NAMEDATALEN] = {0};
+
+ /*
+ * For READY/SYNCDONE states we know the tablesync slot has
+ * already been dropped by the tablesync worker.
+ *
+ * For other states, there is no certainty, maybe the slot
+ * does not exist yet. Also, if we fail after removing some of
+ * the slots, next time, it will again try to drop already
+ * dropped slots and fail. For these reasons, we allow
+ * missing_ok = true for the drop.
+ */
+ ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
+ syncslotname, sizeof(syncslotname));
+ ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
+ }
+ }
+
}
PG_FINALLY();
{
@@ -1616,6 +1819,75 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
return tablelist;
}
+/*
+ * Get the list of sequences which belong to specified publications on the
+ * publisher connection.
+ */
+static List *
+fetch_sequence_list(WalReceiverConn *wrconn, List *publications)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[2] = {TEXTOID, TEXTOID};
+ ListCell *lc;
+ bool first;
+ List *tablelist = NIL;
+
+ Assert(list_length(publications) > 0);
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n"
+ " FROM pg_catalog.pg_publication_sequences s\n"
+ " WHERE s.pubname IN (");
+ first = true;
+ foreach(lc, publications)
+ {
+ char *pubname = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+ }
+ appendStringInfoChar(&cmd, ')');
+
+ res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *nspname;
+ char *relname;
+ bool isnull;
+ RangeVar *rv;
+
+ nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+ relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ rv = makeRangeVar(nspname, relname, -1);
+ tablelist = lappend(tablelist, rv);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+
+ return tablelist;
+}
+
/*
* This is to report the connection failure while dropping replication slots.
* Here, we report the WARNING for all tablesync slots so that user can drop
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 313c87398b2..78f14119d98 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -608,7 +608,7 @@ void
CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname)
{
- if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 6bd95bbce24..8b7e9710401 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4852,6 +4852,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from)
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(pubobjects);
COPY_SCALAR_FIELD(for_all_tables);
+ COPY_SCALAR_FIELD(for_all_sequences);
return newnode;
}
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 4126516222b..55a7dbbddf3 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2337,6 +2337,7 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a,
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(pubobjects);
COMPARE_SCALAR_FIELD(for_all_tables);
+ COMPARE_SCALAR_FIELD(for_all_sequences);
COMPARE_SCALAR_FIELD(action);
return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c4f32425060..bbde765cf56 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9771,6 +9771,26 @@ PublicationObjSpec:
$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
$$->location = @5;
}
+ | SEQUENCE relation_expr
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_SEQUENCE;
+ $$->pubtable = makeNode(PublicationTable);
+ $$->pubtable->relation = $2;
+ }
+ | ALL SEQUENCES IN_P SCHEMA ColId
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA;
+ $$->name = $5;
+ $$->location = @5;
+ }
+ | ALL SEQUENCES IN_P SCHEMA CURRENT_SCHEMA
+ {
+ $$ = makeNode(PublicationObjSpec);
+ $$->pubobjtype = PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA;
+ $$->location = @5;
+ }
| ColId
{
$$ = makeNode(PublicationObjSpec);
@@ -10106,6 +10126,12 @@ UnlistenStmt:
}
;
+/*
+ * FIXME
+ *
+ * opt_publication_for_sequences and publication_for_sequences should be
+ * copies for sequences
+ */
/*****************************************************************************
*
@@ -10114,6 +10140,12 @@ UnlistenStmt:
* BEGIN / COMMIT / ROLLBACK
* (also older versions END / ABORT)
*
+ * ALTER PUBLICATION name ADD SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name DROP SEQUENCE sequence [, sequence2]
+ *
+ * ALTER PUBLICATION name SET SEQUENCE sequence [, sequence2]
+ *
*****************************************************************************/
TransactionStmt:
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 953942692ce..e8ead1387ae 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -647,6 +647,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
pq_sendbytes(out, message, sz);
}
+/*
+ * Write SEQUENCE to stream
+ */
+void
+logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
+ XLogRecPtr lsn, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ uint8 flags = 0;
+ char *relname;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+
+ logicalrep_write_namespace(out, RelationGetNamespace(rel));
+ relname = RelationGetRelationName(rel);
+ pq_sendstring(out, relname);
+
+ pq_sendint8(out, transactional);
+ pq_sendint64(out, last_value);
+ pq_sendint64(out, log_cnt);
+ pq_sendint8(out, is_called);
+}
+
+/*
+ * Read SEQUENCE from the stream.
+ */
+void
+logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
+{
+ /* XXX skipping flags and lsn */
+ pq_getmsgint(in, 1);
+ pq_getmsgint64(in);
+
+ /* Read relation name from stream */
+ seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
+ seqdata->seqname = pstrdup(pq_getmsgstring(in));
+
+ seqdata->transactional = pq_getmsgint(in, 1);
+ seqdata->last_value = pq_getmsgint64(in);
+ seqdata->log_cnt = pq_getmsgint64(in);
+ seqdata->is_called = pq_getmsgint(in, 1);
+}
+
/*
* Write relation description to the output stream.
*/
@@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
+ case LOGICAL_REP_MSG_SEQUENCE:
+ return "SEQUENCE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e596b69d466..c85867ee46b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -359,6 +360,12 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
*
* If the synchronization position is reached (SYNCDONE), then the table can
* be marked as READY and is no longer tracked.
+ *
+ * XXX This needs to handle sequences too - after AlterSubscription_refresh
+ * starts caring about sequences, GetSubscriptionNotReadyRelations won't
+ * return just tables, and we'll have to sync them here. Not sure it's worth
+ * creating a new "sync" worker per sequence, maybe we should just sync them
+ * in the current process (it's pretty light-weight).
*/
static void
process_syncing_tables_for_apply(XLogRecPtr current_lsn)
@@ -873,6 +880,99 @@ copy_table(Relation rel)
logicalrep_rel_close(relmapentry, NoLock);
}
+
+
+/*
+ * FIXME add comment
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+ int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s", quote_qualified_identifier(nspname, relname));
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation lrel;
+ StringInfoData cmd;
+ int64 last_value = 0,
+ log_cnt = 0;
+ bool is_called = 0;
+
+ /* Get the publisher relation info. */
+ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), &lrel);
+
+ /* Put the relation into relmap. */
+ logicalrep_relmap_update(&lrel);
+
+ /* Map the publisher relation to local one. */
+ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+ Assert(rel == relmapentry->localrel);
+
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
+
+ Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+ fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+ elog(WARNING, "sequence %s info last_value %ld log_cnt %ld is_called %d",
+ quote_qualified_identifier(lrel.nspname, lrel.relname),
+ last_value, log_cnt, is_called);
+
+ ResetSequence2(RelationGetRelid(rel), last_value, log_cnt, is_called);
+
+ logicalrep_rel_close(relmapentry, NoLock);
+}
+
+
+
+
/*
* Determine the tablesync slot name.
*
@@ -1134,10 +1234,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+ {
+ /* Now do the initial sequence copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_sequence(rel);
+ PopActiveSnapshot();
+ }
+ else
+ {
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
+ }
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d77bb32bb9e..68708d3907f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -144,6 +144,7 @@
#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
+#include "commands/sequence.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
@@ -1093,6 +1094,61 @@ apply_handle_origin(StringInfo s)
errmsg_internal("ORIGIN message sent out of order")));
}
+/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+ LogicalRepSequence seq;
+ Oid relid;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ return;
+
+ logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && IsTransactionState()));
+
+ /*
+ * Make sure we're in a transaction (needed by ResetSequence2). For
+ * non-transactional updates we're guaranteed to start a new one,
+ * and we'll commit it at the end.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+ seq.seqname, -1),
+ RowExclusiveLock, false);
+
+ /* lock the sequence in AccessExclusiveLock, as expected by ResetSequence2 */
+ elog(WARNING, "locking sequence %d in exclusive mode", relid);
+ LockRelationOid(relid, AccessExclusiveLock);
+
+ elog(WARNING, "applying sequence %s.%s transactional %d last_value %ld log_cnt %ld is_called %d",
+ seq.nspname, seq.seqname, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /* apply the sequence change */
+ ResetSequence2(relid, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+}
+
/*
* Handle STREAM START message.
*/
@@ -2421,6 +2477,10 @@ apply_dispatch(StringInfo s)
*/
break;
+ case LOGICAL_REP_MSG_SEQUENCE:
+ apply_handle_sequence(s);
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6df705f90ff..b82c6b10305 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -49,6 +49,10 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -161,6 +165,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
+ cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -177,6 +182,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
+ cb->stream_sequence_cb = pgoutput_sequence;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -190,6 +196,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
+ bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -197,6 +204,7 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
+ data->sequences = true;
foreach(lc, options)
{
@@ -262,6 +270,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "sequences") == 0)
+ {
+ if (sequences_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ sequences_option_given = true;
+
+ data->sequences = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -858,6 +876,51 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static void
+pgoutput_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+ RelationSyncEntry *relentry;
+
+ if (!data->sequences)
+ return;
+
+ if (!is_publishable_relation(rel))
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ relentry = get_rel_sync_entry(data, RelationGetRelid(rel));
+
+ /*
+ * First check the sequence filter.
+ *
+ * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
+ */
+ if (!relentry->pubactions.pubsequence)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_sequence(ctx->out,
+ rel,
+ xid,
+ sequence_lsn,
+ transactional,
+ last_value,
+ log_cnt,
+ is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/
@@ -1141,7 +1204,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
+ entry->pubactions.pubsequence = false;
entry->publish_as_relid = InvalidOid;
entry->map = NULL; /* will be set by maybe_send_schema() if
* needed */
@@ -1163,6 +1227,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Oid publish_as_relid = relid;
bool am_partition = get_rel_relispartition(relid);
char relkind = get_rel_relkind(relid);
+ bool is_sequence = (get_rel_relkind(relid) == RELKIND_SEQUENCE);
/* Reload publications if needed before use. */
if (!publications_valid)
@@ -1191,6 +1256,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
+ entry->pubactions.pubsequence = false;
if (entry->map)
{
/*
@@ -1213,12 +1279,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
Publication *pub = lfirst(lc);
bool publish = false;
- if (pub->alltables)
+ if (pub->alltables && (!is_sequence))
{
publish = true;
if (pub->pubviaroot && am_partition)
publish_as_relid = llast_oid(get_partition_ancestors(relid));
}
+ else if (pub->allsequences && is_sequence)
+ {
+ publish = true;
+ }
+
+ /* if a sequence, just cross-check the list of publications */
+ if (!publish && is_sequence)
+ {
+ if (list_member_oid(pubids, pub->oid))
+ publish = true;
+ }
if (!publish)
{
@@ -1275,10 +1352,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+ entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
}
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
- entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
+ entry->pubactions.pubdelete && entry->pubactions.pubtruncate &&
+ entry->pubactions.pubsequence)
break;
}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 2707fed12f4..45a8b3e490a 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5586,6 +5586,7 @@ GetRelationPublicationActions(Relation relation)
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
pubactions->pubtruncate |= pubform->pubtruncate;
+ pubactions->pubsequence |= pubform->pubsequence;
ReleaseSysCache(tup);
@@ -5594,7 +5595,8 @@ GetRelationPublicationActions(Relation relation)
* other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate &&
- pubactions->pubdelete && pubactions->pubtruncate)
+ pubactions->pubdelete && pubactions->pubtruncate &&
+ pubactions->pubsequence)
break;
}
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 98882272130..4900c48ff19 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1782,20 +1782,20 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET");
/* ALTER PUBLICATION <name> ADD */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
- COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
- else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") ||
- (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE") &&
+ COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
+ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE") ||
+ (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE") &&
ends_with(prev_wd, ',')))
COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables);
- else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE"))
+ else if (HeadMatches("ALTER", "PUBLICATION", MatchAny, "ADD|SET", "TABLE|SEQUENCE"))
COMPLETE_WITH(",");
/* ALTER PUBLICATION <name> DROP */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
- COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+ COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
/* ALTER PUBLICATION <name> SET */
else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
- COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE");
- else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES", "IN", "SCHEMA"))
+ COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE|SEQUENCE");
+ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES|SEQUENCES", "IN", "SCHEMA"))
COMPLETE_WITH_QUERY_PLUS(Query_for_list_of_schemas
" AND nspname NOT LIKE E'pg\\\\_%'",
"CURRENT_SCHEMA");
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 62f36daa981..a66eb8109d0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11546,6 +11546,11 @@
provolatile => 's', prorettype => 'oid', proargtypes => 'text',
proallargtypes => '{text,oid}', proargmodes => '{i,o}',
proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_tables' },
+{ oid => '8000', descr => 'get OIDs of sequences in a publication',
+ proname => 'pg_get_publication_sequences', prorows => '1000', proretset => 't',
+ provolatile => 's', prorettype => 'oid', proargtypes => 'text',
+ proallargtypes => '{text,oid}', proargmodes => '{i,o}',
+ proargnames => '{pubname,relid}', prosrc => 'pg_get_publication_sequences' },
{ oid => '6121',
descr => 'returns whether a relation can be part of a publication',
proname => 'pg_relation_is_publishable', provolatile => 's',
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 841b9b6c253..e56286772f4 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -40,6 +40,12 @@ CATALOG(pg_publication,6104,PublicationRelationId)
*/
bool puballtables;
+ /*
+ * indicates that this is special publication which should encompass all
+ * sequences in the database (except for the unlogged and temp ones)
+ */
+ bool puballsequences;
+
/* true if inserts are published */
bool pubinsert;
@@ -52,6 +58,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
/* true if truncates are published */
bool pubtruncate;
+ /* true if sequences are published */
+ bool pubsequence;
+
/* true if partition changes are published using root schema */
bool pubviaroot;
} FormData_pg_publication;
@@ -72,6 +81,7 @@ typedef struct PublicationActions
bool pubupdate;
bool pubdelete;
bool pubtruncate;
+ bool pubsequence;
} PublicationActions;
typedef struct Publication
@@ -79,6 +89,7 @@ typedef struct Publication
Oid oid;
char *name;
bool alltables;
+ bool allsequences;
bool pubviaroot;
PublicationActions pubactions;
} Publication;
@@ -121,6 +132,9 @@ extern List *GetPubPartitionOptionRelations(List *result,
PublicationPartOpt pub_partopt,
Oid relid);
+extern List *GetAllSequencesPublicationRelations(void);
+extern List *GetPublicationSequenceRelations(Oid pubid);
+
extern bool is_publishable_relation(Relation rel);
extern bool is_schema_publication(Oid pubid);
extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 9fecc41954e..d8c255a7af5 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -60,6 +60,7 @@ extern ObjectAddress DefineSequence(ParseState *pstate, CreateSeqStmt *stmt);
extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void ResetSequence2(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called);
extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *rptr);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 37fcc4c9b5a..4a990364e4a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3656,6 +3656,10 @@ typedef enum PublicationObjSpecType
PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */
PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of
* search_path */
+ PUBLICATIONOBJ_SEQUENCE, /* Sequence type */
+ PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA, /* Sequences in schema type */
+ PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA, /* Get the first element of
+ * search_path */
PUBLICATIONOBJ_CONTINUATION /* Continuation of previous type */
} PublicationObjSpecType;
@@ -3675,6 +3679,7 @@ typedef struct CreatePublicationStmt
List *options; /* List of DefElem nodes */
List *pubobjects; /* Optional list of publication objects */
bool for_all_tables; /* Special publication for all tables in db */
+ bool for_all_sequences; /* Special publication for all sequences in db */
} CreatePublicationStmt;
typedef enum AlterPublicationAction
@@ -3698,6 +3703,7 @@ typedef struct AlterPublicationStmt
*/
List *pubobjects; /* Optional list of publication objects */
bool for_all_tables; /* Special publication for all tables in db */
+ bool for_all_sequences; /* Special publication for all sequences in db */
AlterPublicationAction action; /* What action to perform with the given
* objects */
} AlterPublicationStmt;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 22fffaca62d..8f8c325522d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -60,6 +60,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_RELATION = 'R',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
+ LOGICAL_REP_MSG_SEQUENCE = 'X', /* FIXME change */
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
LOGICAL_REP_MSG_PREPARE = 'P',
LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -117,6 +118,18 @@ typedef struct LogicalRepTyp
char *typname; /* name of the remote type */
} LogicalRepTyp;
+/* Sequence info */
+typedef struct LogicalRepSequence
+{
+ Oid remoteid; /* unique id of the remote sequence */
+ char *nspname; /* schema name of remote sequence */
+ char *seqname; /* name of the remote sequence */
+ bool transactional;
+ int64 last_value;
+ int64 log_cnt;
+ bool is_called;
+} LogicalRepSequence;
+
/* Transaction info */
typedef struct LogicalRepBeginData
{
@@ -227,6 +240,12 @@ extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_write_sequence(StringInfo out, Relation rel,
+ TransactionId xid, XLogRecPtr lsn,
+ bool transactional,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+extern void logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 78aa9151ef5..a6f6843ada6 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -28,6 +28,7 @@ typedef struct PGOutputData
bool streaming;
bool messages;
bool two_phase;
+ bool sequences;
} PGOutputData;
#endif /* PGOUTPUT_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1420288d67b..3f50e100f8d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1429,6 +1429,14 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_publication_sequences| SELECT p.pubname,
+ n.nspname AS schemaname,
+ c.relname AS sequencename
+ FROM pg_publication p,
+ LATERAL pg_get_publication_sequences((p.pubname)::text) gpt(relid),
+ (pg_class c
+ JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
+ WHERE (c.oid = gpt.relid);
pg_publication_tables| SELECT p.pubname,
n.nspname AS schemaname,
c.relname AS tablename
diff --git a/src/test/subscription/t/028_sequences.pl b/src/test/subscription/t/028_sequences.pl
new file mode 100644
index 00000000000..58775769cdc
--- /dev/null
+++ b/src/test/subscription/t/028_sequences.pl
@@ -0,0 +1,196 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This tests that sequences are replicated correctly by logical replication
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 6;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+ CREATE SEQUENCE s;
+);
+
+# Setup structure on the publisher
+$node_publisher->safe_psql('postgres', $ddl);
+
+# Create some the same structure on subscriber, and an extra sequence that
+# we'll create on the publisher later
+$ddl = qq(
+ CREATE SEQUENCE s;
+ CREATE SEQUENCE s2;
+);
+
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION seq_pub");
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION seq_pub ADD SEQUENCE s");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub WITH (slot_name = seq_sub_slot)"
+);
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert initial test data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ -- generate a number of values using the sequence
+ SELECT nextval('s') FROM generate_series(1,100);
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a rolled-back transaction - should not be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence and roll it back - should not be replicated, due to
+# the transactional behavior
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '1|0|f',
+ 'check replicated sequence values on subscriber');
+
+
+# create a new sequence, advance it in a rolled-back transaction, but commit
+# the create - the advance should be replicated nevertheless
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ CREATE SEQUENCE s2;
+ ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
+ SAVEPOINT sp1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO sp1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Wait for sync of the second sequence we just added to finish
+$synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '132|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the new sequence in a transaction, and roll it back - in this case
+# it should not be replicated at commit
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '231|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+# advance the sequence in a subtransaction - the subtransaction gets rolled
+# back, but commit the main one - the changes should still be replicated
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SAVEPOINT s1;
+ SELECT nextval('s2') FROM generate_series(1,100);
+ ROLLBACK TO s1;
+ COMMIT;
+));
+
+$node_publisher->wait_for_catchup('seq_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s2;
+));
+
+is( $result, '330|0|t',
+ 'check replicated sequence values on subscriber');
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
--
2.34.1
From aa91b8e9a469a4fa13a8b185dfda98e45ee4b8c3 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Sat, 12 Feb 2022 01:24:47 +0100
Subject: [PATCH 2/2] tweak test
---
src/test/subscription/t/028_sequences.pl | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git a/src/test/subscription/t/028_sequences.pl b/src/test/subscription/t/028_sequences.pl
index 58775769cdc..58ed02462d8 100644
--- a/src/test/subscription/t/028_sequences.pl
+++ b/src/test/subscription/t/028_sequences.pl
@@ -20,6 +20,7 @@ $node_subscriber->start;
# Create some preexisting content on publisher
my $ddl = qq(
+ CREATE TABLE seq_test (v BIGINT);
CREATE SEQUENCE s;
);
@@ -29,6 +30,7 @@ $node_publisher->safe_psql('postgres', $ddl);
# Create some the same structure on subscriber, and an extra sequence that
# we'll create on the publisher later
$ddl = qq(
+ CREATE TABLE seq_test (v BIGINT);
CREATE SEQUENCE s;
CREATE SEQUENCE s2;
);
@@ -59,7 +61,7 @@ $node_subscriber->poll_query_until('postgres', $synced_query)
$node_publisher->safe_psql(
'postgres', qq(
-- generate a number of values using the sequence
- SELECT nextval('s') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
));
$node_publisher->wait_for_catchup('seq_sub');
@@ -78,7 +80,7 @@ is( $result, '132|0|t',
$node_publisher->safe_psql(
'postgres', qq(
BEGIN;
- SELECT nextval('s') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100);
ROLLBACK;
));
@@ -101,7 +103,7 @@ $node_publisher->safe_psql(
BEGIN;
CREATE SEQUENCE s2;
ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
- SELECT nextval('s2') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
ROLLBACK;
));
@@ -125,7 +127,7 @@ $node_publisher->safe_psql(
CREATE SEQUENCE s2;
ALTER PUBLICATION seq_pub ADD SEQUENCE s2;
SAVEPOINT sp1;
- SELECT nextval('s2') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
ROLLBACK TO sp1;
COMMIT;
));
@@ -153,7 +155,7 @@ is( $result, '132|0|t',
$node_publisher->safe_psql(
'postgres', qq(
BEGIN;
- SELECT nextval('s2') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
ROLLBACK;
));
@@ -175,7 +177,7 @@ $node_publisher->safe_psql(
'postgres', qq(
BEGIN;
SAVEPOINT s1;
- SELECT nextval('s2') FROM generate_series(1,100);
+ INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100);
ROLLBACK TO s1;
COMMIT;
));
--
2.34.1