On 2020/1/15 08:30, Quan Zongliang wrote:
On 2020/1/3 17:14, Peter Eisentraut wrote:
On 2019-11-01 04:39, Euler Taveira wrote:
ATExecAlterColumnType records everything that depends on the column
and for indexes it saves the definition (via pg_get_indexdef_string).
Definition is not sufficient for reconstructing the replica identity
information because there is not such keyword for replica identity in
CREATE INDEX. The new index should call relation_mark_replica_identity
to fix pg_index.indisreplident.

Yeah, I don't think we need to do the full dance of reverse compiling the SQL command and reexecuting it, as the patch currently does. That's only necessary for rebuilding the index itself.  For re-setting the replica identity, we can just use the internal API as you say.

Also, a few test cases would be nice for this patch.


I'm a little busy. I'll write a new patch in a few days.

new patch attached.


test case 1:
create table t1 (col1 int,col2 int not null,
  col3 int not null,unique(col2,col3));
alter table t1 replica identity using index t1_col2_col3_key;
alter table t1 alter col3 type text;
alter table t1 alter col3 type smallint using col3::int;

test case 2:
create table t2 (col1 varchar(10), col2 text not null,
  col3 timestamp not null,unique(col2,col3),
  col4 int not null unique);
alter table t2 replica identity using index t2_col2_col3_key;
alter table t2 alter col3 type text;
alter table t2 replica identity using index t2_col4_key;
alter table t2 alter col4 type timestamp using '2020-02-11'::timestamp;

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index b7c8d663fc..de4da64e06 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -176,6 +176,7 @@ typedef struct AlteredTableInfo
        List       *changedConstraintDefs;      /* string definitions of same */
        List       *changedIndexOids;   /* OIDs of indexes to rebuild */
        List       *changedIndexDefs;   /* string definitions of same */
+       List       *changedReplIdentName;       /* fullname of index to reset 
REPLICA IDENTIFY */
 } AlteredTableInfo;
 
 /* Struct describing one new constraint to check in Phase 3 scan */
@@ -482,6 +483,7 @@ static ObjectAddress ATExecAlterColumnType(AlteredTableInfo 
*tab, Relation rel,
                                                                                
   AlterTableCmd *cmd, LOCKMODE lockmode);
 static void RememberConstraintForRebuilding(Oid conoid, AlteredTableInfo *tab);
 static void RememberIndexForRebuilding(Oid indoid, AlteredTableInfo *tab);
+static void RememberReplicaIdentForRebuilding(Oid indoid, AlteredTableInfo 
*tab);
 static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab,
                                                                   LOCKMODE 
lockmode);
 static void ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId,
@@ -553,6 +555,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, 
Relation partIdx,
                                                                  Relation 
partitionTbl);
 static List *GetParentedForeignKeyRefs(Relation partition);
 static void ATDetachCheckNoForeignKeyRefs(Relation partition);
+static void relation_mark_replica_identity(Relation rel, char ri_type, Oid 
indexOid, bool is_internal);
 
 
 /* ----------------------------------------------------------------
@@ -4274,7 +4277,10 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
                        Relation        rel;
                        ListCell   *lcmd;
 
-                       if (subcmds == NIL)
+                       if (subcmds == NIL && pass != AT_PASS_MISC)
+                               continue;
+                       if (subcmds == NIL && pass == AT_PASS_MISC &&
+                                       tab->changedReplIdentName==NIL)
                                continue;
 
                        /*
@@ -4295,6 +4301,18 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
                        if (pass == AT_PASS_ALTER_TYPE)
                                ATPostAlterTypeCleanup(wqueue, tab, lockmode);
 
+                       if (pass == AT_PASS_MISC && tab->changedReplIdentName)
+                       {
+                               Relation        idxrel;
+                               ObjectAddress address = 
get_object_address(OBJECT_INDEX,
+                                                               
(Node*)tab->changedReplIdentName, &idxrel,
+                                                               
AccessExclusiveLock, false);
+                               relation_close(idxrel, NoLock);
+
+                               relation_mark_replica_identity(rel, 
REPLICA_IDENTITY_INDEX,
+                                                               
address.objectId, true);
+                       }
+
                        relation_close(rel, NoLock);
                }
        }
@@ -11231,6 +11249,9 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation 
rel,
                                        {
                                                Assert(foundObject.objectSubId 
== 0);
                                                
RememberIndexForRebuilding(foundObject.objectId, tab);
+                                               if 
(RelationGetForm(rel)->relreplident==REPLICA_IDENTITY_INDEX &&
+                                                               
index_is_replident(foundObject.objectId))
+                                                       
RememberReplicaIdentForRebuilding(foundObject.objectId, tab);
                                        }
                                        else if (relKind == RELKIND_SEQUENCE)
                                        {
@@ -11265,9 +11286,20 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation 
rel,
                                }
 
                        case OCLASS_CONSTRAINT:
-                               Assert(foundObject.objectSubId == 0);
-                               
RememberConstraintForRebuilding(foundObject.objectId, tab);
-                               break;
+                               {
+                                       Oid indId;
+
+                                       Assert(foundObject.objectSubId == 0);
+                                       
RememberConstraintForRebuilding(foundObject.objectId, tab);
+
+                                       indId = 
get_constraint_index(foundObject.objectId);
+                                       if (OidIsValid(indId) &&
+                                                       
RelationGetForm(rel)->relreplident==REPLICA_IDENTITY_INDEX &&
+                                                       
index_is_replident(indId))
+                                               
RememberReplicaIdentForRebuilding(indId, tab);
+
+                                       break;
+                               }
 
                        case OCLASS_REWRITE:
                                /* XXX someday see if we can cope with revising 
views */
@@ -11590,6 +11622,26 @@ RememberConstraintForRebuilding(Oid conoid, 
AlteredTableInfo *tab)
        }
 }
 
+/*
+ * Subroutine for ATExecAlterColumnType: remember that a replica identify
+ * needs to be reset (which we might already know).
+ */
+static void
+RememberReplicaIdentForRebuilding(Oid indoid, AlteredTableInfo *tab)
+{
+       char       *nspName;
+       char       *relName;
+
+       /* just in case */
+       if (tab->changedReplIdentName)
+               return;
+
+       relName = get_rel_name(indoid);
+       nspName = get_namespace_name(get_rel_namespace(indoid));
+
+       tab->changedReplIdentName = list_make2(makeString(nspName), 
makeString(relName));
+}
+
 /*
  * Subroutine for ATExecAlterColumnType: remember that an index needs
  * to be rebuilt (which we might already know).
diff --git a/src/backend/utils/cache/lsyscache.c 
b/src/backend/utils/cache/lsyscache.c
index fb0599f7f1..d74c5a1427 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3227,3 +3227,21 @@ get_index_column_opclass(Oid index_oid, int attno)
 
        return opclass;
 }
+
+bool
+index_is_replident(Oid index_oid)
+{
+       HeapTuple               tuple;
+       Form_pg_index   rd_index;
+       bool                    result;
+
+       tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
+       if (!HeapTupleIsValid(tuple))
+               return false;
+
+       rd_index = (Form_pg_index) GETSTRUCT(tuple);
+       result = rd_index->indisreplident;
+       ReleaseSysCache(tuple);
+
+       return result;
+}
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 370f62a133..b91cb84b15 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -181,6 +181,7 @@ extern char *get_namespace_name_or_temp(Oid nspid);
 extern Oid     get_range_subtype(Oid rangeOid);
 extern Oid     get_range_collation(Oid rangeOid);
 extern Oid     get_index_column_opclass(Oid index_oid, int attno);
+extern bool    index_is_replident(Oid index_oid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */

Reply via email to