Hello! Thank you for questions!
> I've got few questions: > 1. How will the subscription work for inherited tables? Do we need tests for > that? For subscription created with `FOR TABLE` we can't support inherit tables because subscriber don't know anything about inherit. In new patch i remove `ONLY` for `FOR TABLE` in subscription related statements > 2. ALTER PUBLICATION has ADD\DROP and SET. Should we add SET too? Or is there > a reason not to do that? Added it in new patch > 3. Message "Must be superuser to create FOR ALL TABLES subscriptions" seems a > bit strange to me. Also, this literal is embedded into translations. I do not > know how we deal with it, how do we deal for example with "måste vara > superanvändare för att skapa prenumerationer" or "для создания подписок нужно > быть суперпользователем"? Where do we insert FOR ALL TABLES? I add hint `Use CREATE SUBSCRIPTION ... FOR TABLE ...` > 4. How does default behavior differ from FOR ALL TABLES? The same with default implementation > 5. Can we alter subscription FOR ALL TABLES? Drop some tables out of the > subscription? For subscriptions created with `FOR ALL TABLES` (default), you can't change subscribed tables by `ALTER SUBSCRIPTION ADD/DROP` table, you should use `ALTER SUBSCRIPTION REFRESH PUBLICATION` And i don't know how do export for user created subscriptions, because now non superuser can't select subconninfo column 03.12.2018, 09:06, "Andrey Borodin" <x4...@yandex-team.ru>: > Hi, Evgeniy! > > Thanks for working on the feature. >> 28 нояб. 2018 г., в 21:41, Evgeniy Efimkin <efim...@yandex-team.ru> >> написал(а): >> >> Hello! >> I wrote some tests(it's just 01_rep_changes.pl but for non superuser) and >> fix `DROP TABLE` from subscription. Now old and new tests pass. >> >> 22.11.2018, 16:23, "Evgeniy Efimkin" <efim...@yandex-team.ru>: >>> Hello! >>> New draft attached with filtering table in subscription (ADD/DROP) and >>> allow non-superusers use` CREATE SUBSCRIPTION` for own tables. > > I've looked into the patch. The code looks good and coherent to nearby code. > There are no docs, obviously, there is WiP. > > I've got few questions: > 1. How will the subscription work for inherited tables? Do we need tests for > that? > 2. ALTER PUBLICATION has ADD\DROP and SET. Should we add SET too? Or is there > a reason not to do that? > 3. Message "Must be superuser to create FOR ALL TABLES subscriptions" seems a > bit strange to me. Also, this literal is embedded into translations. I do not > know how we deal with it, how do we deal for example with "måste vara > superanvändare för att skapa prenumerationer" or "для создания подписок нужно > быть суперпользователем"? Where do we insert FOR ALL TABLES? > 4. How does default behavior differ from FOR ALL TABLES? > 5. Can we alter subscription FOR ALL TABLES? Drop some tables out of the > subscription? > > Best regards, Andrey Borodin. -------- Ефимкин Евгений
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index e136aa6a0b..0782dd40f0 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -38,6 +38,7 @@ #include "utils/syscache.h" + static List *textarray_to_stringlist(ArrayType *textarray); /* @@ -70,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->alltables = subform->suballtables; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 9021463a4c..58f71a227c 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -30,6 +30,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" @@ -322,6 +323,22 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) char originname[NAMEDATALEN]; bool create_slot; List *publications; + AclResult aclresult; + bool alltables; + + alltables = !stmt->tables; + /* FOR ALL TABLES requires superuser */ + if (alltables && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to create subscriptions"), + errhint("Use CREATE SUBSCRIPTION ... FOR TABLE ...")))); + + /* must have CREATE privilege on database */ + aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_DATABASE, + get_database_name(MyDatabaseId)); /* * Parse and check options. @@ -342,11 +359,6 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) if (create_slot) PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)"); - if (!superuser()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - (errmsg("must be superuser to create subscriptions")))); - rel = heap_open(SubscriptionRelationId, RowExclusiveLock); /* Check if name is used */ @@ -375,6 +387,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) /* Check the connection info string. */ walrcv_check_conninfo(conninfo); + walrcv_connstr_check(conninfo); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -388,6 +401,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_suballtables - 1] = BoolGetDatum(alltables); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -411,6 +425,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); replorigin_create(originname); + + if (stmt->tables&&!connect) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot create subscription with connect = false and FOR TABLE"))); + } /* * Connect to remote side to execute requested commands and fetch table * info. @@ -423,6 +444,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) List *tables; ListCell *lc; char table_state; + List *tablesiods = NIL; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -438,6 +460,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + walrcv_security_check(wrconn); /* * Get the table list from publisher and build local table status * info. @@ -446,17 +469,48 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); - Oid relid; - - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + Oid relid; - AddSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + relid = RangeVarGetRelid(rv, NoLock, true); + tablesiods = lappend_oid(tablesiods, relid); } + if (stmt->tables) + foreach(lc, stmt->tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(tablesiods, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + else + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } /* * If requested, create permanent slot for the subscription. We @@ -503,6 +557,242 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) return myself; } +static void +AlterSubscription_set_table(Subscription *sub, List *tables, bool copy_data) +{ + char *err; + List *pubrel_names; + List *subrel_states; + Oid *subrel_local_oids; + Oid *pubrel_local_oids; + Oid *stmt_local_oids; + ListCell *lc; + int off; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); + + /* + * Build qsorted array of local table oids for faster lookup. This can + * potentially contain all tables in the database so speed of lookup is + * important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + stmt_local_oids = palloc(list_length(tables) * sizeof(Oid)); + off = 0; + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + stmt_local_oids[off++] = relid; + } + qsort(stmt_local_oids, list_length(tables), + sizeof(Oid), oid_cmp); + + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + off = 0; + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + pubrel_local_oids[off++] = relid; + } + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + /* + * Walk over the remote tables and try to match them to locally known + * tables. If the table is not known locally create a new state for it. + * + * Also builds array of local oids of remote tables for the next step. + */ + + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp) && + bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + AddSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(DEBUG1, + (errmsg("table \"%s.%s\" added to subscription \"%s\"", + rv->schemaname, rv->relname, sub->name))); + } + } + + /* + * Next remove state for tables we should not care about anymore using the + * data we collected above + */ + + for (off = 0; off < list_length(subrel_states); off++) + { + Oid relid = subrel_local_oids[off]; + + if (!bsearch(&relid, stmt_local_oids, + list_length(tables), sizeof(Oid), oid_cmp)) + { + RemoveSubscriptionRel(sub->oid, relid); + + logicalrep_worker_stop_at_commit(sub->oid, relid); + + ereport(DEBUG1, + (errmsg("table \"%s.%s\" removed from subscription \"%s\"", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid), + sub->name))); + } + } +} + +static void +AlterSubscription_drop_table(Subscription *sub, List *tables) +{ + List *subrel_states; + Oid *subrel_local_oids; + ListCell *lc; + int off; + + Assert(list_length(tables) > 0); + subrel_states = GetSubscriptionRelations(sub->oid); + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not in preset subscription", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + } + else + { + RemoveSubscriptionRel(sub->oid, relid); + logicalrep_worker_stop_at_commit(sub->oid, relid); + } + + } +} + +static void +AlterSubscription_add_table(Subscription *sub, List *tables, bool copy_data) +{ + char *err; + List *pubrel_names; + ListCell *lc; + List *pubrels = NIL; + + Assert(list_length(tables) > 0); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = fetch_table_list(wrconn, sub->publications); + /* Get oids of rels in command */ + foreach(lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, NoLock, true); + pubrels = lappend_oid(pubrels, relid); + } + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + foreach(lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + char table_state; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, + get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + if (!list_member_oid(pubrels, relid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("table \"%s.%s\" not preset in publication", + get_namespace_name(get_rel_namespace(relid)), + get_rel_name(relid)))); + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + AddSubscriptionRelState(sub->oid, relid, + table_state, + InvalidXLogRecPtr); + } +} + static void AlterSubscription_refresh(Subscription *sub, bool copy_data) { @@ -625,6 +915,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool update_tuple = false; Subscription *sub; Form_pg_subscription form; + char *err = NULL; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -721,10 +1012,31 @@ AlterSubscription(AlterSubscriptionStmt *stmt) } case ALTER_SUBSCRIPTION_CONNECTION: - /* Load the library providing us libpq calls. */ - load_file("libpqwalreceiver", false); - /* Check the connection info string. */ - walrcv_check_conninfo(stmt->conninfo); + { + walrcv_check_conninfo(stmt->conninfo); + walrcv_connstr_check(stmt->conninfo); + if (sub->enabled) + { + /* Load the library providing us libpq calls. */ + /* Check the connection info string. */ + load_file("libpqwalreceiver", false); + wrconn = walrcv_connect(stmt->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + PG_TRY(); + { + walrcv_security_check(wrconn); + } + PG_CATCH(); + { + /* Close the connection in case of failure. */ + walrcv_disconnect(wrconn); + PG_RE_THROW(); + } + PG_END_TRY(); + } + } values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(stmt->conninfo); @@ -773,6 +1085,12 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + if (!sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for FOR TABLE subscriptions"), + errhint("Use ALTER SUBSCRIPTION ADD/DROP TABLE ..."))); + parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, @@ -782,7 +1100,73 @@ AlterSubscription(AlterSubscriptionStmt *stmt) break; } + case ALTER_SUBSCRIPTION_ADD_TABLE: + { + bool copy_data; + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for disabled subscriptions"))); + + if (sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... ADD TABLE is not allowed for FOR ALL TABLES subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, ©_data, + NULL, NULL); + AlterSubscription_add_table(sub, stmt->tables, copy_data); + + break; + } + case ALTER_SUBSCRIPTION_DROP_TABLE: + { + + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions"))); + + if (sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for FOR ALL TABLES subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, NULL, + NULL, NULL); + + AlterSubscription_drop_table(sub, stmt->tables); + + break; + } + case ALTER_SUBSCRIPTION_SET_TABLE: + { + bool copy_data; + if (!sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for disabled subscriptions"))); + + if (sub->alltables) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... DROP TABLE is not allowed for FOR ALL TABLES subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, NULL, ©_data, + NULL, NULL); + + AlterSubscription_set_table(sub, stmt->tables, copy_data); + + break; + } default: elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", stmt->kind); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index db49968409..b929c26adc 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4612,6 +4612,8 @@ _copyCreateSubscriptionStmt(const CreateSubscriptionStmt *from) COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); COPY_NODE_FIELD(options); + COPY_NODE_FIELD(tables); + COPY_SCALAR_FIELD(for_all_tables); return newnode; } @@ -4625,6 +4627,7 @@ _copyAlterSubscriptionStmt(const AlterSubscriptionStmt *from) COPY_STRING_FIELD(subname); COPY_STRING_FIELD(conninfo); COPY_NODE_FIELD(publication); + COPY_NODE_FIELD(tables); COPY_NODE_FIELD(options); return newnode; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 3a084b4d1f..1082918ff1 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2238,6 +2238,8 @@ _equalCreateSubscriptionStmt(const CreateSubscriptionStmt *a, COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); COMPARE_NODE_FIELD(options); + COMPARE_NODE_FIELD(tables); + COMPARE_SCALAR_FIELD(for_all_tables); return true; } @@ -2250,6 +2252,7 @@ _equalAlterSubscriptionStmt(const AlterSubscriptionStmt *a, COMPARE_STRING_FIELD(subname); COMPARE_STRING_FIELD(conninfo); COMPARE_NODE_FIELD(publication); + COMPARE_NODE_FIELD(tables); COMPARE_NODE_FIELD(options); return true; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 2c2208ffb7..0b1e3a9db5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -395,7 +395,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options - relation_expr_list dostmt_opt_list + relation_expr_list remote_relation_expr_list dostmt_opt_list transform_element_list transform_type_list TriggerTransitions TriggerReferencing publication_name_list @@ -405,6 +405,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause %type <node> opt_publication_for_tables publication_for_tables +%type <node> opt_subscription_for_tables subscription_for_tables %type <value> publication_name_item %type <list> opt_fdw_options fdw_options @@ -489,6 +490,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <node> table_ref %type <jexpr> joined_table %type <range> relation_expr +%type <range> remote_relation_expr %type <range> relation_expr_opt_alias %type <node> tablesample_clause opt_repeatable_clause %type <target> target_el set_target insert_column_item @@ -9565,7 +9567,7 @@ AlterPublicationStmt: *****************************************************************************/ CreateSubscriptionStmt: - CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition + CREATE SUBSCRIPTION name CONNECTION Sconst PUBLICATION publication_name_list opt_definition opt_subscription_for_tables { CreateSubscriptionStmt *n = makeNode(CreateSubscriptionStmt); @@ -9573,9 +9575,33 @@ CreateSubscriptionStmt: n->conninfo = $5; n->publication = $7; n->options = $8; + if ($9 != NULL) + { + /* FOR TABLE */ + if (IsA($9, List)) + n->tables = (List *)$9; + /* FOR ALL TABLES */ + else + n->for_all_tables = true; + } $$ = (Node *)n; } ; +opt_subscription_for_tables: + subscription_for_tables { $$ = $1; } + | /* EMPTY */ { $$ = NULL; } + ; + +subscription_for_tables: + FOR TABLE remote_relation_expr_list + { + $$ = (Node *) $3; + } + | FOR ALL TABLES + { + $$ = (Node *) makeInteger(true); + } + ; publication_name_list: publication_name_item @@ -9655,6 +9681,37 @@ AlterSubscriptionStmt: (Node *)makeInteger(false), @1)); $$ = (Node *)n; } + | ALTER SUBSCRIPTION name ADD_P TABLE remote_relation_expr_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ADD_TABLE; + n->subname = $3; + n->tables = $6; + n->options = $7; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name DROP TABLE remote_relation_expr_list + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_DROP_TABLE; + n->subname = $3; + n->tables = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name SET TABLE remote_relation_expr_list + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_SET_TABLE; + n->subname = $3; + n->tables = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } ; /***************************************************************************** @@ -12094,6 +12151,23 @@ relation_expr_list: | relation_expr_list ',' relation_expr { $$ = lappend($1, $3); } ; +remote_relation_expr: + qualified_name + { + /* no inheritance */ + $$ = $1; + $$->inh = false; + $$->alias = NULL; + } + ; + + +remote_relation_expr_list: + remote_relation_expr { $$ = list_make1($1); } + | remote_relation_expr_list ',' remote_relation_expr { $$ = lappend($1, $3); } + ; + + /* * Given "UPDATE foo set set ...", we have to decide without looking any diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9b75711ebd..49c5b68858 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,6 +52,9 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo, bool logical, const char *appname, char **err); static void libpqrcv_check_conninfo(const char *conninfo); +static void libpqrcv_connstr_check(const char *connstr); +static void libpqrcv_security_check(WalReceiverConn *conn); + static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); @@ -83,6 +86,8 @@ static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, libpqrcv_check_conninfo, + libpqrcv_connstr_check, + libpqrcv_security_check, libpqrcv_get_conninfo, libpqrcv_get_senderinfo, libpqrcv_identify_system, @@ -232,6 +237,54 @@ libpqrcv_check_conninfo(const char *conninfo) PQconninfoFree(opts); } +static void +libpqrcv_security_check(WalReceiverConn *conn) +{ + if (!superuser()) + { + if (!PQconnectionUsedPassword(conn->streamConn)) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superuser cannot connect if the server does not request a password."), + errhint("Target server's authentication method must be changed."))); + } +} + +static void +libpqrcv_connstr_check(const char *connstr) +{ + if (!superuser()) + { + PQconninfoOption *options; + PQconninfoOption *option; + bool connstr_gives_password = false; + + options = PQconninfoParse(connstr, NULL); + if (options) + { + for (option = options; option->keyword != NULL; option++) + { + if (strcmp(option->keyword, "password") == 0) + { + if (option->val != NULL && option->val[0] != '\0') + { + connstr_gives_password = true; + break; + } + } + } + PQconninfoFree(options); + } + + if (!connstr_gives_password) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("password is required"), + errdetail("Non-superusers must provide a password in the connection string."))); + } +} + /* * Return a user-displayable conninfo string. Any security-sensitive fields * are obfuscated. diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 1f20df5680..d4c14e3e17 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -77,6 +77,28 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Relcache invalidation callback for all relation map cache. + */ +void +logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, uint32 hashvalue) +{ + LogicalRepRelMapEntry *entry; + /* invalidate all cache entries */ + if (LogicalRepRelMap == NULL) + return; + HASH_SEQ_STATUS status; + hash_seq_init(&status, LogicalRepRelMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + entry->localreloid = InvalidOid; + entry->state = SUBREL_STATE_UNKNOWN; + } +} + + + /* * Initialize the relation map cache. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8d5e0946c4..465c36632a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1741,6 +1741,9 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, + logicalrep_relmap_invalidate_cb2, + (Datum) 0); /* Build logical replication streaming options. */ options.logical = true; diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 4298c3cbf2..3534459bd6 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -47,6 +47,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subenabled; /* True if the subscription is enabled (the * worker should be running) */ + bool suballtables; #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ @@ -77,6 +78,7 @@ typedef struct Subscription char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ + bool alltables; } Subscription; extern Subscription *GetSubscription(Oid subid, bool missing_ok); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e5bdc1cec5..a2c18fbd08 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3475,6 +3475,8 @@ typedef struct CreateSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + List *tables; /* Optional list of tables to add */ + bool for_all_tables; /* Special subscription for all tables in publication */ } CreateSubscriptionStmt; typedef enum AlterSubscriptionType @@ -3483,7 +3485,10 @@ typedef enum AlterSubscriptionType ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, - ALTER_SUBSCRIPTION_ENABLED + ALTER_SUBSCRIPTION_ENABLED, + ALTER_SUBSCRIPTION_DROP_TABLE, + ALTER_SUBSCRIPTION_ADD_TABLE, + ALTER_SUBSCRIPTION_SET_TABLE } AlterSubscriptionType; typedef struct AlterSubscriptionStmt @@ -3494,6 +3499,10 @@ typedef struct AlterSubscriptionStmt char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ + /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ + List *tables; /* List of tables to add/drop */ + bool for_all_tables; /* Special publication for all tables in db */ + DefElemAction tableAction; /* What action to perform with the tables */ } AlterSubscriptionStmt; typedef struct DropSubscriptionStmt diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 73e4805827..4fb95c1d03 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -38,5 +38,7 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, extern void logicalrep_typmap_update(LogicalRepTyp *remotetyp); extern char *logicalrep_typmap_gettypname(Oid remoteid); +void logicalrep_relmap_invalidate_cb2(Datum arg, int cacheid, + uint32 hashvalue); #endif /* LOGICALRELATION_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 5913b580c2..fd7c710547 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -204,6 +204,8 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica const char *appname, char **err); typedef void (*walrcv_check_conninfo_fn) (const char *conninfo); +typedef void (*walrcv_connstr_check_fn) (const char *connstr); +typedef void (*walrcv_security_check_fn) (WalReceiverConn *conn); typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn); typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, char **sender_host, @@ -237,6 +239,8 @@ typedef struct WalReceiverFunctionsType { walrcv_connect_fn walrcv_connect; walrcv_check_conninfo_fn walrcv_check_conninfo; + walrcv_connstr_check_fn walrcv_connstr_check; + walrcv_security_check_fn walrcv_security_check; walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; @@ -256,6 +260,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err) #define walrcv_check_conninfo(conninfo) \ WalReceiverFunctions->walrcv_check_conninfo(conninfo) +#define walrcv_connstr_check(connstr) \ + WalReceiverFunctions->walrcv_connstr_check(connstr) +#define walrcv_security_check(conn) \ + WalReceiverFunctions->walrcv_security_check(conn) #define walrcv_get_conninfo(conn) \ WalReceiverFunctions->walrcv_get_conninfo(conn) #define walrcv_get_senderinfo(conn, sender_host, sender_port) \ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 4fcbf7efe9..d19da3c01a 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -44,6 +44,7 @@ ERROR: subscription "testsub" already exists SET SESSION AUTHORIZATION 'regress_subscription_user2'; CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION foo WITH (connect = false); ERROR: must be superuser to create subscriptions +HINT: Use CREATE SUBSCRIPTION ... FOR TABLE ... SET SESSION AUTHORIZATION 'regress_subscription_user'; -- fail - invalid option combinations CREATE SUBSCRIPTION testsub2 CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true); diff --git a/src/test/subscription/t/011_rep_changes_nonsuperuser.pl b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl new file mode 100644 index 0000000000..3acbb5663c --- /dev/null +++ b/src/test/subscription/t/011_rep_changes_nonsuperuser.pl @@ -0,0 +1,316 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More; + +if ($windows_os) +{ + plan skip_all => "authentication tests cannot run on Windows"; +} +else +{ + plan tests => 18; +} + +sub reset_pg_hba +{ + my $node = shift; + my $hba_method = shift; + + unlink($node->data_dir . '/pg_hba.conf'); + $node->append_conf('pg_hba.conf', "local all normal $hba_method"); + $node->append_conf('pg_hba.conf', "local all all trust"); + $node->reload; + return; +} + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_subscriber->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_subscriber->safe_psql('postgres', + "GRANT CREATE ON DATABASE postgres TO normal;"); +$node_subscriber->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN;"); +reset_pg_hba($node_subscriber, 'trust'); + + +$node_publisher->safe_psql('postgres', + "SET password_encryption='md5'; CREATE ROLE normal LOGIN PASSWORD 'pass';"); +$node_publisher->safe_psql('postgres', + "ALTER ROLE normal WITH LOGIN; ALTER ROLE normal WITH SUPERUSER"); +reset_pg_hba($node_publisher, 'md5'); + + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b) VALUES (1, 'foo')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)", extra_params => [ '-U', 'normal' ]); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)", extra_params => [ '-U', 'normal' ]); + +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (c text, b text, a int primary key)", extra_params => [ '-U', 'normal' ]); + +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +, extra_params => [ '-U', 'normal' ]); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_notrep" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr password=pass user=normal application_name=$appname' + PUBLICATION tap_pub, tap_pub_ins_only + FOR TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_ins", + extra_params => [ '-U', 'normal' ]); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar')"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT c, b, a FROM tab_mixed"); +is( $result, qq(|foo|1 +|bar|2), 'check replicated changes with different column order'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), + 'check replicated changes with primary key index with included columns'); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); + +# and do the updates +$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(20|1|100), + 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT x FROM tab_full2 ORDER BY 1"); +is( $result, qq(a +bb +bb), + 'update works with REPLICA IDENTITY FULL and text datums'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. Not all of these are registered as tests +# as we need to poll for a change but the test suite will fail none the less +# when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION 'application_name=$appname $publisher_connstr'" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); + +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), + 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); + +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD TABLE tab_full WITH (copy_data = false)"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->wait_for_catchup($appname); + +# note that data are different on provider and subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check replicated deletes after alter publication'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check drop table from subscription +$result = $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub DROP TABLE tab_full"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(-1)"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';" +) or die "Timed out while waiting for apply to restart"; + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast');