diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 4f99ebb447..395fe530b3 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -399,6 +399,9 @@ ExecRenameStmt(RenameStmt *stmt) case OBJECT_TYPE: return RenameType(stmt); + case OBJECT_PUBLICATION: + return RenamePublication(stmt->subname, stmt->newname); + case OBJECT_AGGREGATE: case OBJECT_COLLATION: case OBJECT_CONVERSION: @@ -416,7 +419,6 @@ ExecRenameStmt(RenameStmt *stmt) case OBJECT_TSDICTIONARY: case OBJECT_TSPARSER: case OBJECT_TSTEMPLATE: - case OBJECT_PUBLICATION: case OBJECT_SUBSCRIPTION: { ObjectAddress address; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index d6ffef374e..86b270d1cf 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -433,6 +433,85 @@ pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestor return result; } +/* + * Execute ALTER PUBLICATION RENAME + */ +ObjectAddress +RenamePublication(const char *oldname, const char *newname) +{ + Relation rel; + HeapTuple tup; + ObjectAddress address; + Form_pg_publication pubform; + bool replaces[Natts_pg_publication]; + bool nulls[Natts_pg_publication]; + Datum values[Natts_pg_publication]; + + rel = table_open(PublicationRelationId, RowExclusiveLock); + + tup = SearchSysCacheCopy1(PUBLICATIONNAME, + CStringGetDatum(oldname)); + + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication \"%s\" does not exist", + oldname))); + + pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* must be owner */ + if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION, + NameStr(pubform->pubname)); + + /* Everything ok, form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Only update the pubname */ + values[Anum_pg_publication_pubname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(newname)); + replaces[Anum_pg_publication_pubname - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Invalidate the relcache. */ + if (pubform->puballtables) + { + CacheInvalidateRelcacheAll(); + } + else + { + List *relids = NIL; + List *schemarelids = NIL; + + /* + * XXX: all tables in the tree is listed now, but this may be too much. + */ + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + + relids = list_concat_unique_oid(relids, schemarelids); + + InvalidatePublicationRels(relids); + } + + CatalogTupleUpdate(rel, &tup->t_self, tup); + + ObjectAddressSet(address, PublicationRelationId, pubform->oid); + + heap_freetuple(tup); + + table_close(rel, RowExclusiveLock); + + return address; +} + /* check_functions_in_node callback */ static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context) @@ -1920,6 +1999,30 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) } form->pubowner = newOwnerId; + + /* Invalidate the relcache. */ + if (form->puballtables) + { + CacheInvalidateRelcacheAll(); + } + else + { + List *relids = NIL; + List *schemarelids = NIL; + + /* + * XXX: all tables in the tree is listed now, but this may be too much. + */ + relids = GetPublicationRelations(form->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemaPublicationRelations(form->oid, + PUBLICATION_PART_ALL); + + relids = list_concat_unique_oid(relids, schemarelids); + + InvalidatePublicationRels(relids); + } + CatalogTupleUpdate(rel, &tup->t_self, tup); /* Update owner dependency reference */ diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 4aa8646af7..ec10bfdd8c 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9466,7 +9466,7 @@ RenameStmt: ALTER AGGREGATE aggregate_with_argtypes RENAME TO name RenameStmt *n = makeNode(RenameStmt); n->renameType = OBJECT_PUBLICATION; - n->object = (Node *) makeString($3); + n->subname = $3; n->newname = $6; n->missing_ok = false; $$ = (Node *) n; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1d80d27d0f..b280532a3a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -218,7 +218,6 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); -static void rel_sync_cache_publicationrel_cb(Datum arg, Oid pubid); static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid); static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, @@ -1917,8 +1916,6 @@ init_rel_sync_cache(MemoryContext cachectx) rel_sync_cache_publication_cb, (Datum) 0); - CacheRegisterPubcacheCallback(rel_sync_cache_publicationrel_cb, (Datum) 0); - relation_callbacks_registered = true; } @@ -2317,29 +2314,6 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) } } -/* - * Publication invalidation callback - */ -static void -rel_sync_cache_publicationrel_cb(Datum arg, Oid pubid) -{ - HASH_SEQ_STATUS status; - RelationSyncEntry *entry; - - if (RelationSyncCache == NULL) - return; - - hash_seq_init(&status, RelationSyncCache); - while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL) - { - if (entry->replicate_valid && list_member_oid(entry->pub_ids, pubid)) - { - entry->replicate_valid = false; - entry->pub_ids = NIL; - } - } -} - /* * Publication relation/schema map syscache invalidation callback * diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index a34be79ee6..53f00dfd13 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -160,9 +160,6 @@ */ #define CatCacheMsgs 0 #define RelCacheMsgs 1 -#define PubCacheMsgs 2 - -#define NumberofCache 3 /* Pointers to main arrays in TopTransactionContext */ typedef struct InvalMessageArray @@ -171,13 +168,13 @@ typedef struct InvalMessageArray int maxmsgs; /* current allocated size of array */ } InvalMessageArray; -static InvalMessageArray InvalMessageArrays[NumberofCache]; +static InvalMessageArray InvalMessageArrays[2]; /* Control information for one logical group of messages */ typedef struct InvalidationMsgsGroup { - int firstmsg[NumberofCache]; /* first index in relevant array */ - int nextmsg[NumberofCache]; /* last+1 index */ + int firstmsg[2]; /* first index in relevant array */ + int nextmsg[2]; /* last+1 index */ } InvalidationMsgsGroup; /* Macros to help preserve InvalidationMsgsGroup abstraction */ @@ -192,7 +189,6 @@ typedef struct InvalidationMsgsGroup do { \ SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \ SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \ - SetSubGroupToFollow(targetgroup, priorgroup, PubCacheMsgs); \ } while (0) #define NumMessagesInSubGroup(group, subgroup) \ @@ -200,9 +196,7 @@ typedef struct InvalidationMsgsGroup #define NumMessagesInGroup(group) \ (NumMessagesInSubGroup(group, CatCacheMsgs) + \ - NumMessagesInSubGroup(group, RelCacheMsgs) + \ - NumMessagesInSubGroup(group, PubCacheMsgs)) - + NumMessagesInSubGroup(group, RelCacheMsgs)) /*---------------- * Invalidation messages are divided into two groups: @@ -256,7 +250,6 @@ int debug_discard_caches = 0; #define MAX_SYSCACHE_CALLBACKS 64 #define MAX_RELCACHE_CALLBACKS 10 -#define MAX_PUBCACHE_CALLBACKS 10 static struct SYSCACHECALLBACK { @@ -278,14 +271,6 @@ static struct RELCACHECALLBACK static int relcache_callback_count = 0; -static struct PUBCACHECALLBACK -{ - PubcacheCallbackFunction function; - Datum arg; -} pubcache_callback_list[MAX_PUBCACHE_CALLBACKS]; - -static int pubcache_callback_count = 0; - /* ---------------------------------------------------------------- * Invalidation subgroup support functions * ---------------------------------------------------------------- @@ -478,38 +463,6 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group, AddInvalidationMessage(group, RelCacheMsgs, &msg); } -/* - * Add a publication inval entry - */ -static void -AddPubcacheInvalidationMessage(InvalidationMsgsGroup *group, - Oid dbId, Oid pubId) -{ - SharedInvalidationMessage msg; - - /* - * Don't add a duplicate item. We assume dbId need not be checked because - * it will never change. InvalidOid for relId means all relations so we - * don't need to add individual ones when it is present. - */ - - ProcessMessageSubGroup(group, PubCacheMsgs, - if (msg->pc.id == SHAREDINVALPUBCACHE_ID && - (msg->pc.pubId == pubId || - msg->pc.pubId == InvalidOid)) - return); - - - /* OK, add the item */ - msg.pc.id = SHAREDINVALPUBCACHE_ID; - msg.pc.dbId = dbId; - msg.pc.pubId = pubId; - /* check AddCatcacheInvalidationMessage() for an explanation */ - VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg)); - - AddInvalidationMessage(group, PubCacheMsgs, &msg); -} - /* * Add a snapshot inval entry * @@ -548,7 +501,6 @@ AppendInvalidationMessages(InvalidationMsgsGroup *dest, { AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs); AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs); - AppendInvalidationMessageSubGroup(dest, src, PubCacheMsgs); } /* @@ -563,7 +515,6 @@ ProcessInvalidationMessages(InvalidationMsgsGroup *group, { ProcessMessageSubGroup(group, CatCacheMsgs, func(msg)); ProcessMessageSubGroup(group, RelCacheMsgs, func(msg)); - ProcessMessageSubGroup(group, PubCacheMsgs, func(msg)); } /* @@ -576,7 +527,6 @@ ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group, { ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n)); ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n)); - ProcessMessageSubGroupMulti(group, PubCacheMsgs, func(msgs, n)); } /* ---------------------------------------------------------------- @@ -639,18 +589,6 @@ RegisterRelcacheInvalidation(Oid dbId, Oid relId) transInvalInfo->RelcacheInitFileInval = true; } -/* - * RegisterPubcacheInvalidation - * - * As above, but register a publication invalidation event. - */ -static void -RegisterPubcacheInvalidation(Oid dbId, Oid pubId) -{ - AddPubcacheInvalidationMessage(&transInvalInfo->CurrentCmdInvalidMsgs, - dbId, pubId); -} - /* * RegisterSnapshotInvalidation * @@ -721,8 +659,6 @@ PrepareInvalidationState(void) InvalMessageArrays[CatCacheMsgs].maxmsgs = 0; InvalMessageArrays[RelCacheMsgs].msgs = NULL; InvalMessageArrays[RelCacheMsgs].maxmsgs = 0; - InvalMessageArrays[PubCacheMsgs].msgs = NULL; - InvalMessageArrays[PubCacheMsgs].maxmsgs = 0; } transInvalInfo = myInfo; @@ -836,20 +772,6 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg) else if (msg->sn.dbId == MyDatabaseId) InvalidateCatalogSnapshot(); } - else if (msg->id == SHAREDINVALPUBCACHE_ID) - { - if (msg->pc.dbId == MyDatabaseId || msg->pc.dbId == InvalidOid) - { - int i; - - for (i = 0; i < pubcache_callback_count; i++) - { - struct PUBCACHECALLBACK *pcitem = pubcache_callback_list + i; - - pcitem->function(pcitem->arg, msg->pc.pubId); - } - } - } else elog(FATAL, "unrecognized SI message ID: %d", msg->id); } @@ -1021,18 +943,6 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, msgs, n * sizeof(SharedInvalidationMessage)), nmsgs += n)); - ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs, - PubCacheMsgs, - (memcpy(msgarray + nmsgs, - msgs, - n * sizeof(SharedInvalidationMessage)), - nmsgs += n)); - ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs, - PubCacheMsgs, - (memcpy(msgarray + nmsgs, - msgs, - n * sizeof(SharedInvalidationMessage)), - nmsgs += n)); Assert(nmsgs == nummsgs); return nmsgs; @@ -1401,17 +1311,6 @@ CacheInvalidateHeapTuple(Relation relation, else return; } - else if (tupleRelId == PublicationRelationId) - { - Form_pg_publication pubtup = (Form_pg_publication) GETSTRUCT(tuple); - - /* get publication id */ - relationId = pubtup->oid; - databaseId = MyDatabaseId; - - RegisterPubcacheInvalidation(databaseId, relationId); - return; - } else return; @@ -1667,25 +1566,6 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, ++relcache_callback_count; } -/* - * CacheRegisterPubcacheCallback - * Register the specified function to be called for all future - * publication invalidation events. The OID of the publication being - * invalidated will be passed to the function. - */ -void -CacheRegisterPubcacheCallback(PubcacheCallbackFunction func, - Datum arg) -{ - if (pubcache_callback_count >= MAX_PUBCACHE_CALLBACKS) - elog(FATAL, "out of pubcache_callback_list slots"); - - pubcache_callback_list[pubcache_callback_count].function = func; - pubcache_callback_list[pubcache_callback_count].arg = arg; - - ++pubcache_callback_count; -} - /* * CallSyscacheCallbacks * @@ -1748,9 +1628,6 @@ LogLogicalInvalidations(void) ProcessMessageSubGroupMulti(group, RelCacheMsgs, XLogRegisterData((char *) msgs, n * sizeof(SharedInvalidationMessage))); - ProcessMessageSubGroupMulti(group, PubCacheMsgs, - XLogRegisterData((char *) msgs, - n * sizeof(SharedInvalidationMessage))); XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); } } diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 5487c571f6..b953193812 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -35,5 +35,6 @@ extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); +extern ObjectAddress RenamePublication(const char *oldname, const char *newname); #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/storage/sinval.h b/src/include/storage/sinval.h index 9a97268b0a..8f5744b21b 100644 --- a/src/include/storage/sinval.h +++ b/src/include/storage/sinval.h @@ -110,14 +110,6 @@ typedef struct Oid relId; /* relation ID */ } SharedInvalSnapshotMsg; -#define SHAREDINVALPUBCACHE_ID (-6) -typedef struct -{ - int8 id; /* type field --- must be first */ - Oid dbId; /* database ID, or 0 if a shared relation */ - Oid pubId; /* publication ID */ -} SharedInvalPubcacheMsg; - typedef union { int8 id; /* type field --- must be first */ @@ -127,7 +119,6 @@ typedef union SharedInvalSmgrMsg sm; SharedInvalRelmapMsg rm; SharedInvalSnapshotMsg sn; - SharedInvalPubcacheMsg pc; } SharedInvalidationMessage; diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index 66d27b8bee..24695facf2 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -22,7 +22,6 @@ extern PGDLLIMPORT int debug_discard_caches; typedef void (*SyscacheCallbackFunction) (Datum arg, int cacheid, uint32 hashvalue); typedef void (*RelcacheCallbackFunction) (Datum arg, Oid relid); -typedef void (*PubcacheCallbackFunction) (Datum arg, Oid pubid); extern void AcceptInvalidationMessages(void); @@ -60,9 +59,6 @@ extern void CacheRegisterSyscacheCallback(int cacheid, extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg); -extern void CacheRegisterPubcacheCallback(PubcacheCallbackFunction func, - Datum arg); - extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); extern void InvalidateSystemCaches(void);