Hi Peter
,
Thanks for the review and sorry it took me a while to get back.

On Wed, Jan 8, 2020 at 7:54 PM Peter Eisentraut
<peter.eisentr...@2ndquadrant.com> wrote:
> Looking through 0001, I think perhaps there is a better way to structure
> some of the API changes.
>
> Instead of passing the root_target_rel to CheckValidResultRel() and
> CheckCmdReplicaIdentity(), which we only need to check the publication
> actions of the root table, how about changing
> GetRelationPublicationActions() to automatically include the publication
> information of the root table.  Then we have that information in the
> relcache once and don't need to check the base table and the partition
> root separately at each call site (of which there is only one right
> now).  (Would that work correctly with relcache invalidation?)
>
> Similarly, couldn't GetRelationPublications() just automatically take
> partitioning into account?  We don't need the separation between
> GetRelationPublications() and GetRelationAncestorPublications().  This
> would also avoid errors of omission, for example the
> GetRelationPublications() call in ATPrepChangePersistence() doesn't take
> GetRelationAncestorPublications() into account.

I have addressed these comments in the attached updated patch.

Other than that, the updated patch contains following significant changes:

* Changed pg_publication.c: GetPublicationRelations() so that any
published partitioned tables are expanded as needed

* Since the pg_publication_tables view is backed by
GetPublicationRelations(), that means subscriptioncmds.c:
fetch_table_list() no longer needs to craft a query to include
partitions when needed, because partitions are included at source.
That seems better, because it allows to limit the complexity
surrounding publication of partitioned tables to the publication side.

* Fixed the publication table DDL to spot more cases of tables being
added to a publication in a duplicative manner.  For example,
partition being added to a publication which already contains its
ancestor and a partitioned tables being added to a publication
(implying all of its partitions are added) which already contains a
partition

Only attaching 0001.  Will send the rest after polishing them a bit more.

Thanks,
Amit
From 72eb76b32daa384074beaa3b3b1946db8fd154a8 Mon Sep 17 00:00:00 2001
From: amit <amitlangot...@gmail.com>
Date: Thu, 7 Nov 2019 18:19:33 +0900
Subject: [PATCH v9] Support adding partitioned tables to publication

---
 doc/src/sgml/logical-replication.sgml       |  18 +--
 doc/src/sgml/ref/create_publication.sgml    |  20 +++-
 src/backend/catalog/pg_publication.c        | 164 ++++++++++++++++++++++---
 src/backend/commands/publicationcmds.c      |  16 ++-
 src/backend/replication/logical/tablesync.c |   1 +
 src/backend/replication/pgoutput/pgoutput.c |  19 ++-
 src/bin/pg_dump/pg_dump.c                   |   8 +-
 src/include/catalog/pg_publication.h        |   2 +-
 src/test/regress/expected/publication.out   |  30 ++++-
 src/test/regress/sql/publication.sql        |  18 ++-
 src/test/subscription/t/013_partition.pl    | 178 ++++++++++++++++++++++++++++
 11 files changed, 428 insertions(+), 46 deletions(-)
 create mode 100644 src/test/subscription/t/013_partition.pl

diff --git a/doc/src/sgml/logical-replication.sgml 
b/doc/src/sgml/logical-replication.sgml
index f657d1d06e..fa30ac27f7 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -402,13 +402,17 @@
 
    <listitem>
     <para>
-     Replication is only possible from base tables to base tables.  That is,
-     the tables on the publication and on the subscription side must be normal
-     tables, not views, materialized views, partition root tables, or foreign
-     tables.  In the case of partitions, you can therefore replicate a
-     partition hierarchy one-to-one, but you cannot currently replicate to a
-     differently partitioned setup.  Attempts to replicate tables other than
-     base tables will result in an error.
+     Replication is only supported by regular and partitioned tables, although
+     the type of the table must match between the two servers, that is, one
+     cannot replicate from a regular table into a partitioned able or vice
+     versa. Also, when replicating between partitioned tables, the actual
+     replication occurs between leaf partitions, so the partitions on the two
+     servers must match one-to-one.
+    </para>
+
+    <para>
+     Attempts to replicate other types of relations such as views, materialized
+     views, or foreign tables, will result in an error.
     </para>
    </listitem>
   </itemizedlist>
diff --git a/doc/src/sgml/ref/create_publication.sgml 
b/doc/src/sgml/ref/create_publication.sgml
index 99f87ca393..a304f9b8c3 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -68,15 +68,23 @@ CREATE PUBLICATION <replaceable 
class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is 
not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included.  However, adding
+      a partitioned table to a publication never explicitly adds its 
partitions,
+      because partitions are implicitly published due to the partitioned table
+      being added to the publication.
      </para>
 
      <para>
-      Only persistent base tables can be part of a publication.  Temporary
-      tables, unlogged tables, foreign tables, materialized views, regular
-      views, and partitioned tables cannot be part of a publication.  To
-      replicate a partitioned table, add the individual partitions to the
-      publication.
+      Only persistent base tables and partitioned tables can be part of a
+      publication. Temporary tables, unlogged tables, foreign tables,
+      materialized views, regular views cannot be part of a publication.
+     </para>
+
+     <para>
+      When a partitioned table is added to a publication, all of its existing
+      and future partitions are also implicitly considered to be part of the
+      publication.  So, even operations that are performed directly on a
+      partition are also published via its ancestors' publications.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c 
b/src/backend/catalog/pg_publication.c
index c5eea7af3f..c05617dec9 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -24,8 +24,10 @@
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_type.h"
@@ -40,6 +42,8 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+static List *get_rel_publications(Oid relid);
+
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
@@ -47,17 +51,9 @@
 static void
 check_publication_add_relation(Relation targetrel)
 {
-       /* Give more specific error for partitioned tables */
-       if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("\"%s\" is a partitioned table",
-                                               
RelationGetRelationName(targetrel)),
-                                errdetail("Adding partitioned tables to 
publications is not supported."),
-                                errhint("You can add the table partitions 
individually.")));
-
-       /* Must be table */
-       if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
+       /* Must be a regular or partitioned table */
+       if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
+               RelationGetForm(targetrel)->relkind != 
RELKIND_PARTITIONED_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 errmsg("\"%s\" is not a table",
@@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
 static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
-       return reltuple->relkind == RELKIND_RELATION &&
+       return (reltuple->relkind == RELKIND_RELATION ||
+                       reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
                !IsCatalogRelationOid(relid) &&
                reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
                relid >= FirstNormalObjectId;
@@ -165,6 +162,10 @@ publication_add_relation(Oid pubid, Relation targetrel,
         * Check for duplicates. Note that this does not really prevent
         * duplicates, it's here just to provide nicer error message in common
         * case. The real protection is the unique key on the catalog.
+        *
+        * We give special messages for when a partition is found to be 
implicitly
+        * published via an ancestor and when a partitioned tables's partitions
+        * are found to be published on their own.
         */
        if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
                                                          
ObjectIdGetDatum(pubid)))
@@ -179,6 +180,71 @@ publication_add_relation(Oid pubid, Relation targetrel,
                                 errmsg("relation \"%s\" is already member of 
publication \"%s\"",
                                                
RelationGetRelationName(targetrel), pub->name)));
        }
+       else if (targetrel->rd_rel->relispartition)
+       {
+               List   *ancestors = get_partition_ancestors(relid);
+               ListCell *lc;
+               Oid             ancestor;
+               bool    found = false;
+
+               foreach(lc, ancestors)
+               {
+                       ancestor = lfirst_oid(lc);
+                       if (SearchSysCacheExists2(PUBLICATIONRELMAP,
+                                                                         
ObjectIdGetDatum(ancestor),
+                                                                         
ObjectIdGetDatum(pubid)))
+                       {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (found)
+               {
+                       Assert(OidIsValid(ancestor));
+                       table_close(rel, RowExclusiveLock);
+
+                       if (if_not_exists)
+                               return InvalidObjectAddress;
+
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                        errmsg("relation \"%s\" is already 
member of publication \"%s\" via ancestor \"%s\"",
+                                                       
RelationGetRelationName(targetrel), pub->name,
+                                                       
get_rel_name(ancestor))));
+               }
+       }
+       else if (targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       {
+               List   *pub_rels = GetPublicationRelations(pubid, true);
+               List   *parts = find_all_inheritors(relid, NoLock, NULL);
+               ListCell *lc;
+               Oid             partition;
+               bool    found = false;
+
+               foreach(lc, parts)
+               {
+                       partition = lfirst_oid(lc);
+                       if (list_member_oid(pub_rels, partition))
+                       {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (found)
+               {
+                       Assert(OidIsValid(partition));
+                       table_close(rel, RowExclusiveLock);
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DUPLICATE_OBJECT),
+                                        errmsg("descendent table \"%s\" of 
\"%s\"is already member of publication \"%s\"",
+                                                       get_rel_name(partition),
+                                                       
RelationGetRelationName(targetrel), pub->name),
+                                        errhint("Remove descendent tables of 
\"%s\" from publication before adding it to the publication.",
+                                                        
RelationGetRelationName(targetrel))));
+               }
+       }
 
        check_publication_add_relation(targetrel);
 
@@ -221,10 +287,35 @@ publication_add_relation(Oid pubid, Relation targetrel,
 
 
 /*
- * Gets list of publication oids for a relation oid.
+ * Gets list of publication oids for a relation, plus those of ancestors,
+ * if any, if the relation is a partition.
  */
 List *
 GetRelationPublications(Oid relid)
+{
+       List       *result = NIL;
+
+       result = get_rel_publications(relid);
+       if (get_rel_relispartition(relid))
+       {
+               List       *ancestors = get_partition_ancestors(relid);
+               ListCell   *lc;
+
+               foreach(lc, ancestors)
+               {
+                       Oid                     ancestor = lfirst_oid(lc);
+                       List       *ancestor_pubs = 
get_rel_publications(ancestor);
+
+                       result = list_concat(result, ancestor_pubs);
+               }
+       }
+
+       return result;
+}
+
+/* Workhorse of GetRelationPublications() */
+static List *
+get_rel_publications(Oid relid)
 {
        List       *result = NIL;
        CatCList   *pubrellist;
@@ -251,9 +342,14 @@ GetRelationPublications(Oid relid)
  *
  * This should only be used for normal publications, the FOR ALL TABLES
  * should use GetAllTablesPublicationRelations().
+ *
+ * Caller should pass true for 'include_partitions' so that for any
+ * partitioned tables that are in the publication its partitions are
+ * included too if the operation to be performed on the returned relations
+ * expects to see all relations that are affected by the publication.
  */
 List *
-GetPublicationRelations(Oid pubid)
+GetPublicationRelations(Oid pubid, bool include_partitions)
 {
        List       *result;
        Relation        pubrelsrel;
@@ -278,8 +374,12 @@ GetPublicationRelations(Oid pubid)
                Form_pg_publication_rel pubrel;
 
                pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
-
-               result = lappend_oid(result, pubrel->prrelid);
+               if (get_rel_relkind(pubrel->prrelid) == 
RELKIND_PARTITIONED_TABLE &&
+                       include_partitions)
+                       result = list_concat(result, 
find_all_inheritors(pubrel->prrelid,
+                                                                               
                                         NoLock, NULL));
+               else
+                       result = lappend_oid(result, pubrel->prrelid);
        }
 
        systable_endscan(scan);
@@ -480,10 +580,40 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
                oldcontext = 
MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
                publication = GetPublicationByName(pubname, false);
+
+               /*
+                * Publications support partitioned tables, although we need to 
filter
+                * them out from the result, because all changes are replicated 
using
+                * the leaf partition identity and schema.
+                */
                if (publication->alltables)
+               {
+                       /*
+                        * GetAllTablesPublicationRelations() only ever returns 
leaf
+                        * partitions.
+                        */
                        tables = GetAllTablesPublicationRelations();
+               }
                else
-                       tables = GetPublicationRelations(publication->oid);
+               {
+                       List   *all_tables;
+                       ListCell *lc;
+
+                       /*
+                        * GetPublicationRelations() includes partitioned 
tables in its
+                        * result which is required by other internal users of 
that
+                        * function, which must be filtered out.
+                        */
+                       all_tables = GetPublicationRelations(publication->oid, 
true);
+                       tables = NIL;
+                       foreach(lc, all_tables)
+                       {
+                               Oid             relid = lfirst_oid(lc);
+
+                               if (get_rel_relkind(relid) != 
RELKIND_PARTITIONED_TABLE)
+                                       tables = lappend_oid(tables, relid);
+                       }
+               }
                funcctx->user_fctx = (void *) tables;
 
                MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/commands/publicationcmds.c 
b/src/backend/commands/publicationcmds.c
index f96cb42adc..d4b43e7662 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -299,7 +299,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, 
Relation rel,
        }
        else
        {
-               List       *relids = GetPublicationRelations(pubform->oid);
+               List       *relids = GetPublicationRelations(pubform->oid, 
true);
 
                /*
                 * We don't want to send too many individual messages, at some 
point
@@ -356,7 +356,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation 
rel,
                PublicationDropTables(pubid, rels, false);
        else                                            /* DEFELEM_SET */
        {
-               List       *oldrelids = GetPublicationRelations(pubid);
+               List       *oldrelids = GetPublicationRelations(pubid, false);
                List       *delrels = NIL;
                ListCell   *oldlc;
 
@@ -498,7 +498,8 @@ RemovePublicationRelById(Oid proid)
 
 /*
  * Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
  */
 static List *
 OpenTableList(List *tables)
@@ -539,8 +540,13 @@ OpenTableList(List *tables)
                rels = lappend(rels, rel);
                relids = lappend_oid(relids, myrelid);
 
-               /* Add children of this rel, if requested */
-               if (recurse)
+               /*
+                * Add children of this rel, if requested, so that they too are 
added
+                * to the publication.  A partitioned table can't have any 
inheritance
+                * children other than its partitions, which need not be 
explicitly
+                * added to the publication.
+                */
+               if (recurse && rel->rd_rel->relkind != 
RELKIND_PARTITIONED_TABLE)
                {
                        List       *children;
                        ListCell   *child;
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index f8183cd488..98825f01e9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -761,6 +761,7 @@ copy_table(Relation rel)
        /* Map the publisher relation to local one. */
        relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
        Assert(rel == relmapentry->localrel);
+       Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
 
        /* Start copy on the publisher. */
        initStringInfo(&cmd);
diff --git a/src/backend/replication/pgoutput/pgoutput.c 
b/src/backend/replication/pgoutput/pgoutput.c
index 752508213a..d6b9cbe1bd 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                                                                
uint32 hashvalue);
 
-/* Entry in the map used to remember which relation schemas we sent. */
+/*
+ * Entry in the map used to remember which relation schemas we sent.
+ *
+ * For partitions, 'pubactions' considers not only the table's own
+ * publications, but also those of all of its ancestors.
+ */
 typedef struct RelationSyncEntry
 {
        Oid                     relid;                  /* relation oid */
@@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
                if (!relentry->pubactions.pubtruncate)
                        continue;
 
+               /*
+                * Don't send partitioned tables, because partitions would be
+                * sent instead.
+                */
+               if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       continue;
+
                relids[nrelids++] = relid;
                maybe_send_schema(ctx, relation, relentry);
        }
@@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 /*
  * Find or create entry in the relation schema cache.
+ *
+ * This looks up publications that given relation is directly or indirectly
+ * part of (latter if it's really the relation's ancestor that is part of a
+ * publication) and fills up the found entry with the information about
+ * which operations to publish.
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 799b6988b7..dc33c20048 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3969,8 +3969,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], 
int numTables)
        {
                TableInfo  *tbinfo = &tblinfo[i];
 
-               /* Only plain tables can be aded to publications. */
-               if (tbinfo->relkind != RELKIND_RELATION)
+               /*
+                * Only regular and partitioned tables can be added to
+                * publications.
+                */
+               if (tbinfo->relkind != RELKIND_RELATION &&
+                       tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
                        continue;
 
                /*
diff --git a/src/include/catalog/pg_publication.h 
b/src/include/catalog/pg_publication.h
index 6cdc2b1197..04a8b87e78 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -80,7 +80,7 @@ typedef struct Publication
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
-extern List *GetPublicationRelations(Oid pubid);
+extern List *GetPublicationRelations(Oid pubid, bool include_partitions);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
diff --git a/src/test/regress/expected/publication.out 
b/src/test/regress/expected/publication.out
index feb51e4add..d1d9b90c50 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -116,6 +116,31 @@ Tables:
 
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
+-- Tests for partitioned tables
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted;
+RESET client_min_messages;
+-- should add only the parent to publication, not the partition
+CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1);
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+\dRp+ testpub_forparted
+                          Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | 
Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
+Tables:
+    "public.testpub_parted"
+
+-- fail - can't re-add partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1;
+ERROR:  relation "testpub_parted1" is already member of publication 
"testpub_forparted" via ancestor "testpub_parted"
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1;
+-- fail - can't re-add partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+ERROR:  descendent table "testpub_parted1" of "testpub_parted"is already 
member of publication "testpub_forparted"
+HINT:  Remove descendent tables of "testpub_parted" from publication before 
adding it to the publication.
+DROP PUBLICATION testpub_forparted;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
@@ -142,11 +167,6 @@ Tables:
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
 DETAIL:  Only tables can be added to publications.
--- fail - partitioned table
-ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
-ERROR:  "testpub_parted" is a partitioned table
-DETAIL:  Adding partitioned tables to publications is not supported.
-HINT:  You can add the table partitions individually.
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
diff --git a/src/test/regress/sql/publication.sql 
b/src/test/regress/sql/publication.sql
index 5773a755cf..7074c08efd 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -69,6 +69,22 @@ RESET client_min_messages;
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
 
+-- Tests for partitioned tables
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted;
+RESET client_min_messages;
+-- should add only the parent to publication, not the partition
+CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1);
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+\dRp+ testpub_forparted
+-- fail - can't re-add partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1;
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1;
+-- fail - can't re-add partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+DROP PUBLICATION testpub_forparted;
+
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 SET client_min_messages = 'ERROR';
@@ -83,8 +99,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 
 -- fail - view
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
--- fail - partitioned table
-ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
 
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
diff --git a/src/test/subscription/t/013_partition.pl 
b/src/test/subscription/t/013_partition.pl
new file mode 100644
index 0000000000..1fa392b618
--- /dev/null
+++ b/src/test/subscription/t/013_partition.pl
@@ -0,0 +1,178 @@
+# Test PARTITION
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 15;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $node_subscriber2 = get_new_node('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# publisher
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub_all FOR ALL TABLES");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
+
+# subscriber1
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY 
LIST (a)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT 
NULL)");
+$node_subscriber1->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR 
VALUES IN (5, 6)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION 
pub1");
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b 
text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', 
b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', 
b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION 
pub_all");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 
's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_2 VALUES (5)");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+my $result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
+is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
+
+# update (no partition change)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
+
+# update (partition changes)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 6 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
+is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab1 WHERE a IN (3, 5)");
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_1");
+is($result, qq(0||), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_2");
+is($result, qq(0||), 'delete from tab1_2 replicated');
+
+# truncate
+$node_subscriber1->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_subscriber2->safe_psql('postgres',
+       "INSERT INTO tab1_2 VALUES (2)");
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_2");
+is($result, qq(0||), 'truncate of tab1_2 replicated');
+
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate of tab1_1 replicated');
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate of tab1_1 replicated');
-- 
2.16.5

Reply via email to