Rebase to current sources, to appease CF bot; no other changes.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index b1de6d0674..ea3ae56991 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -4485,6 +4485,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        when using declarative partitioning.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inhdetached</structfield> <type>bool</type>
+      </para>
+      <para>
+       Set to true for a partition that is in the process of being detached;
+       false otherwise.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index c25ef5abd6..09673dcf1c 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -36,7 +36,9 @@ ALTER TABLE ALL IN TABLESPACE <replaceable class="parameter">name</replaceable>
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
     ATTACH PARTITION <replaceable class="parameter">partition_name</replaceable> { FOR VALUES <replaceable class="parameter">partition_bound_spec</replaceable> | DEFAULT }
 ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
-    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable>
+    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable> [ CONCURRENTLY ]
+ALTER TABLE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
+    DETACH PARTITION <replaceable class="parameter">partition_name</replaceable> FINALIZE
 
 <phrase>where <replaceable class="parameter">action</replaceable> is one of:</phrase>
 
@@ -938,7 +940,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
    </varlistentry>
 
    <varlistentry>
-    <term><literal>DETACH PARTITION</literal> <replaceable class="parameter">partition_name</replaceable></term>
+    <term><literal>DETACH PARTITION</literal> <replaceable class="parameter">partition_name</replaceable> [ { CONCURRENTLY | FINALIZE } ]</term>
+
     <listitem>
      <para>
       This form detaches the specified partition of the target table.  The detached
@@ -947,6 +950,24 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       attached to the target table's indexes are detached.  Any triggers that
       were created as clones of those in the target table are removed.
      </para>
+     <para>
+      If <literal>CONCURRENTLY</literal> is specified, this process runs in two
+      transactions in order to avoid blocking other sessions that might be accessing
+      the partitioned table.  During the first transaction, a
+      <literal>SHARE UPDATE EXCLUSIVE</literal> lock is taken on both parent table and
+      partition, and its partition is marked detached; at that point, the transaction
+      is committed and all transactions using the partitioned table are waited for.
+      Once all those transactions are gone, the second stage acquires
+      <literal>ACCESS EXCLUSIVE</literal> on the partition, and the detach process
+      completes.
+      <literal>CONCURRENTLY</literal> is not allowed if the
+      partitioned table contains a default partition.
+     </para>
+     <para>
+      If <literal>FINALIZE</literal> is specified, a previous
+      <literal>DETACH CONCURRENTLY</literal> invocation that was cancelled or
+      interrupted is completed.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 9abc4a1f55..1a05f2a2fe 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -2555,10 +2555,12 @@ StoreConstraints(Relation rel, List *cooked_constraints, bool is_internal)
  * Returns a list of CookedConstraint nodes that shows the cooked form of
  * the default and constraint expressions added to the relation.
  *
- * NB: caller should have opened rel with AccessExclusiveLock, and should
- * hold that lock till end of transaction.  Also, we assume the caller has
- * done a CommandCounterIncrement if necessary to make the relation's catalog
- * tuples visible.
+ * NB: caller should have opened rel with some self-conflicting lock mode,
+ * and should hold that lock till end of transaction; for normal cases that'll
+ * be AccessExclusiveLock, but if caller knows that the constraint is already
+ * enforced by some other means, it can be ShareUpdateExclusiveLock.  Also, we
+ * assume the caller has done a CommandCounterIncrement if necessary to make
+ * the relation's catalog tuples visible.
  */
 List *
 AddRelationNewConstraints(Relation rel,
@@ -3827,7 +3829,8 @@ StorePartitionBound(Relation rel, Relation parent, PartitionBoundSpec *bound)
 	 * relcache entry for that partition every time a partition is added or
 	 * removed.
 	 */
-	defaultPartOid = get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
+	defaultPartOid =
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(parent, false));
 	if (OidIsValid(defaultPartOid))
 		CacheInvalidateRelcacheByRelid(defaultPartOid);
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 4ef61b5efd..28f04a0700 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1836,7 +1836,7 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, const char *oldName)
 		List	   *ancestors = get_partition_ancestors(oldIndexId);
 		Oid			parentIndexRelid = linitial_oid(ancestors);
 
-		DeleteInheritsTuple(oldIndexId, parentIndexRelid);
+		DeleteInheritsTuple(oldIndexId, parentIndexRelid, false, NULL);
 		StoreSingleInheritance(newIndexId, parentIndexRelid, 1);
 
 		list_free(ancestors);
@@ -2486,7 +2486,7 @@ index_drop(Oid indexId, bool concurrent, bool concurrent_lock_mode)
 	/*
 	 * fix INHERITS relation
 	 */
-	DeleteInheritsTuple(indexId, InvalidOid);
+	DeleteInheritsTuple(indexId, InvalidOid, false, NULL);
 
 	/*
 	 * We are presently too lazy to attempt to compute the new correct value
diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index af7754d6ab..4327a0b150 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -32,7 +32,7 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-static Oid	get_partition_parent_worker(Relation inhRel, Oid relid);
+static Oid	get_partition_parent_worker(Relation inhRel, Oid relid, bool *detached);
 static void get_partition_ancestors_worker(Relation inhRel, Oid relid,
 										   List **ancestors);
 
@@ -42,6 +42,9 @@ static void get_partition_ancestors_worker(Relation inhRel, Oid relid,
  *
  * Returns inheritance parent of a partition by scanning pg_inherits
  *
+ * If the partition is in the process of being detached, InvalidOid is
+ * returned and no error is thrown.
+ *
  * Note: Because this function assumes that the relation whose OID is passed
  * as an argument will have precisely one parent, it should only be called
  * when it is known that the relation is a partition.
@@ -51,12 +54,15 @@ get_partition_parent(Oid relid)
 {
 	Relation	catalogRelation;
 	Oid			result;
+	bool		detached;
 
 	catalogRelation = table_open(InheritsRelationId, AccessShareLock);
 
-	result = get_partition_parent_worker(catalogRelation, relid);
+	result = get_partition_parent_worker(catalogRelation, relid, &detached);
 
-	if (!OidIsValid(result))
+	if (detached)
+		result = InvalidOid;
+	else if (!OidIsValid(result))
 		elog(ERROR, "could not find tuple for parent of relation %u", relid);
 
 	table_close(catalogRelation, AccessShareLock);
@@ -70,13 +76,15 @@ get_partition_parent(Oid relid)
  *		given relation
  */
 static Oid
-get_partition_parent_worker(Relation inhRel, Oid relid)
+get_partition_parent_worker(Relation inhRel, Oid relid, bool *detached)
 {
 	SysScanDesc scan;
 	ScanKeyData key[2];
 	Oid			result = InvalidOid;
 	HeapTuple	tuple;
 
+	*detached = false;
+
 	ScanKeyInit(&key[0],
 				Anum_pg_inherits_inhrelid,
 				BTEqualStrategyNumber, F_OIDEQ,
@@ -93,7 +101,14 @@ get_partition_parent_worker(Relation inhRel, Oid relid)
 	{
 		Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
 
-		result = form->inhparent;
+		/* A partition being detached has no parent */
+		if (form->inhdetached)
+		{
+			*detached = true;
+			result = InvalidOid;
+		}
+		else
+			result = form->inhparent;
 	}
 
 	systable_endscan(scan);
@@ -134,9 +149,10 @@ static void
 get_partition_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
 {
 	Oid			parentOid;
+	bool		detached;
 
 	/* Recursion ends at the topmost level, ie., when there's no parent */
-	parentOid = get_partition_parent_worker(inhRel, relid);
+	parentOid = get_partition_parent_worker(inhRel, relid, &detached);
 	if (parentOid == InvalidOid)
 		return;
 
diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index 5ab7902827..3fd0880ca0 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -52,7 +52,8 @@ typedef struct SeenRelsEntry
  * against possible DROPs of child relations.
  */
 List *
-find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
+find_inheritance_children(Oid parentrelId, bool include_detached,
+						  LOCKMODE lockmode)
 {
 	List	   *list = NIL;
 	Relation	relation;
@@ -91,7 +92,13 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
 
 	while ((inheritsTuple = systable_getnext(scan)) != NULL)
 	{
+		/* skip detached at caller's request */
+		if (((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhdetached &&
+			!include_detached)
+			continue;
+
 		inhrelid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhrelid;
+
 		if (numoids >= maxoids)
 		{
 			maxoids *= 2;
@@ -160,6 +167,9 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode)
  * given rel; caller should already have locked it).  If lockmode is NoLock
  * then no locks are acquired, but caller must beware of race conditions
  * against possible DROPs of child relations.
+ *
+ * NB - No current callers of this routine are interested in children being
+ * concurrently detached, so there's no provision to include them.
  */
 List *
 find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
@@ -199,7 +209,8 @@ find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
 		ListCell   *lc;
 
 		/* Get the direct children of this rel */
-		currentchildren = find_inheritance_children(currentrel, lockmode);
+		currentchildren = find_inheritance_children(currentrel, false,
+													lockmode);
 
 		/*
 		 * Add to the queue only those children not already seen. This avoids
@@ -430,6 +441,7 @@ StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber)
 	values[Anum_pg_inherits_inhrelid - 1] = ObjectIdGetDatum(relationId);
 	values[Anum_pg_inherits_inhparent - 1] = ObjectIdGetDatum(parentOid);
 	values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(seqNumber);
+	values[Anum_pg_inherits_inhdetached - 1] = BoolGetDatum(false);
 
 	memset(nulls, 0, sizeof(nulls));
 
@@ -449,10 +461,21 @@ StoreSingleInheritance(Oid relationId, Oid parentOid, int32 seqNumber)
  * as InvalidOid, in which case all tuples matching inhrelid are deleted;
  * otherwise only delete tuples with the specified inhparent.
  *
+ * 'detached' is the expected state of the inhdetached flag.  If the catalog
+ * row does not match that state, an error is raised.  When used on
+ * partitions, the partition name must be passed, for possible error messages.
+ *
+ * If allow_detached is passed false, then an error is raised if the child is
+ * already marked detached.  A name must be supplied in that case, for the
+ * error message.  This should only be used with table partitions.
+ * XXX the name bit is pretty weird and problematic for non-partition cases;
+ * what if the flag check fails?
+ *
  * Returns whether at least one row was deleted.
  */
 bool
-DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
+DeleteInheritsTuple(Oid inhrelid, Oid inhparent, bool detached,
+					const char *childname)
 {
 	bool		found = false;
 	Relation	catalogRelation;
@@ -479,6 +502,28 @@ DeleteInheritsTuple(Oid inhrelid, Oid inhparent)
 		parent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
 		if (!OidIsValid(inhparent) || parent == inhparent)
 		{
+			bool	inhdetached;
+
+			inhdetached = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhdetached;
+
+			/*
+			 * Raise error depending on state.  This should only happen for
+			 * partitions, but we have no way to cross-check.
+			 */
+			if (inhdetached && !detached)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot detach partition \"%s\"",
+								childname),
+						 errdetail("The partition is being detached concurrently or has an unfinished detach."),
+						 errhint("Use ALTER TABLE ... DETACH PARTITION ... FINALIZE to complete the pending detach operation")));
+			if (!inhdetached && detached)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("cannot complete detaching partition \"%s\"",
+								childname),
+						 errdetail("There's no pending concurrent detach.")));
+
 			CatalogTupleDelete(catalogRelation, &inheritsTuple->t_self);
 			found = true;
 		}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 8bc652ecd3..82670bac21 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -410,7 +410,7 @@ CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts)
  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
  * doesn't show up in the output, we know we can forget about it.
  */
-static void
+void
 WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
 {
 	int			n_old_snapshots;
@@ -1123,7 +1123,7 @@ DefineIndex(Oid relationId,
 	 */
 	if (partitioned && stmt->relation && !stmt->relation->inh)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(rel);
+		PartitionDesc pd = RelationGetPartitionDesc(rel, false);
 
 		if (pd->nparts != 0)
 			flags |= INDEX_CREATE_INVALID;
@@ -1180,7 +1180,7 @@ DefineIndex(Oid relationId,
 		 *
 		 * If we're called internally (no stmt->relation), recurse always.
 		 */
-		partdesc = RelationGetPartitionDesc(rel);
+		partdesc = RelationGetPartitionDesc(rel, false);
 		if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
 		{
 			int			nparts = partdesc->nparts;
@@ -4073,6 +4073,7 @@ IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
 			values[Anum_pg_inherits_inhparent - 1] =
 				ObjectIdGetDatum(parentOid);
 			values[Anum_pg_inherits_inhseqno - 1] = Int32GetDatum(1);
+			values[Anum_pg_inherits_inhdetached - 1] = BoolGetDatum(false);
 			memset(isnull, false, sizeof(isnull));
 
 			tuple = heap_form_tuple(RelationGetDescr(pg_inherits),
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 559fa1d2e5..5986b4dd56 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -156,6 +156,8 @@ typedef struct AlteredTableInfo
 	Oid			relid;			/* Relation to work on */
 	char		relkind;		/* Its relkind */
 	TupleDesc	oldDesc;		/* Pre-modification tuple descriptor */
+	/* Transiently set during Phase 2, normally set to NULL */
+	Relation	rel;
 	/* Information saved by Phase 1 for Phase 2: */
 	List	   *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
 	/* Information saved by Phases 1/2 for Phase 3: */
@@ -353,7 +355,7 @@ static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 					  AlterTableUtilityContext *context);
 static void ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 							  AlterTableUtilityContext *context);
-static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+static void ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 					  AlterTableCmd *cmd, LOCKMODE lockmode, int cur_pass,
 					  AlterTableUtilityContext *context);
 static AlterTableCmd *ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab,
@@ -539,7 +541,8 @@ static PartitionSpec *transformPartitionSpec(Relation rel, PartitionSpec *partsp
 static void ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partParams, AttrNumber *partattrs,
 								  List **partexprs, Oid *partopclass, Oid *partcollation, char strategy);
 static void CreateInheritance(Relation child_rel, Relation parent_rel);
-static void RemoveInheritance(Relation child_rel, Relation parent_rel);
+static void RemoveInheritance(Relation child_rel, Relation parent_rel,
+							  bool allow_detached);
 static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
 										   PartitionCmd *cmd,
 										   AlterTableUtilityContext *context);
@@ -548,8 +551,14 @@ static void QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
 											   List *partConstraint,
 											   bool validate_default);
 static void CloneRowTriggersToPartition(Relation parent, Relation partition);
+static void DetachAddConstraintIfNeeded(List **wqueue, Relation partRel);
 static void DropClonedTriggersFromPartition(Oid partitionId);
-static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
+static ObjectAddress ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab,
+										   Relation rel, RangeVar *name,
+										   bool concurrent);
+static void DetachPartitionFinalize(Relation rel, Relation partRel,
+									bool concurrent, Oid defaultPartOid, bool wait);
+static ObjectAddress ATExecDetachPartitionFinalize(Relation rel, RangeVar *name);
 static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation rel,
 											  RangeVar *name);
 static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl);
@@ -984,7 +993,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
 		 * lock the partition so as to avoid a deadlock.
 		 */
 		defaultPartOid =
-			get_default_oid_from_partdesc(RelationGetPartitionDesc(parent));
+			get_default_oid_from_partdesc(RelationGetPartitionDesc(parent,
+																   false));
 		if (OidIsValid(defaultPartOid))
 			defaultRel = table_open(defaultPartOid, AccessExclusiveLock);
 
@@ -3263,7 +3273,7 @@ renameatt_internal(Oid myrelid,
 		 * expected_parents will only be 0 if we are not already recursing.
 		 */
 		if (expected_parents == 0 &&
-			find_inheritance_children(myrelid, NoLock) != NIL)
+			find_inheritance_children(myrelid, false, NoLock) != NIL)
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 					 errmsg("inherited column \"%s\" must be renamed in child tables too",
@@ -3462,7 +3472,7 @@ rename_constraint_internal(Oid myrelid,
 		else
 		{
 			if (expected_parents == 0 &&
-				find_inheritance_children(myrelid, NoLock) != NIL)
+				find_inheritance_children(myrelid, false, NoLock) != NIL)
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 						 errmsg("inherited constraint \"%s\" must be renamed in child tables too",
@@ -4081,7 +4091,14 @@ AlterTableGetLockLevel(List *cmds)
 				break;
 
 			case AT_DetachPartition:
-				cmd_lockmode = AccessExclusiveLock;
+				if (((PartitionCmd *) cmd->def)->concurrent)
+					cmd_lockmode = ShareUpdateExclusiveLock;
+				else
+					cmd_lockmode = AccessExclusiveLock;
+				break;
+
+			case AT_DetachPartitionFinalize:
+				cmd_lockmode = ShareUpdateExclusiveLock;
 				break;
 
 			case AT_CheckNotNull:
@@ -4472,6 +4489,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
 			/* No command-specific prep needed */
 			pass = AT_PASS_MISC;
 			break;
+		case AT_DetachPartitionFinalize:
+			ATSimplePermissions(rel, ATT_TABLE);
+			/* No command-specific prep needed */
+			pass = AT_PASS_MISC;
+			break;
 		default:				/* oops */
 			elog(ERROR, "unrecognized alter table type: %d",
 				 (int) cmd->subtype);
@@ -4512,7 +4534,6 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 		{
 			AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
 			List	   *subcmds = tab->subcmds[pass];
-			Relation	rel;
 			ListCell   *lcmd;
 
 			if (subcmds == NIL)
@@ -4521,10 +4542,10 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 			/*
 			 * Appropriate lock was obtained by phase 1, needn't get it again
 			 */
-			rel = relation_open(tab->relid, NoLock);
+			tab->rel = relation_open(tab->relid, NoLock);
 
 			foreach(lcmd, subcmds)
-				ATExecCmd(wqueue, tab, rel,
+				ATExecCmd(wqueue, tab,
 						  castNode(AlterTableCmd, lfirst(lcmd)),
 						  lockmode, pass, context);
 
@@ -4536,7 +4557,11 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 			if (pass == AT_PASS_ALTER_TYPE)
 				ATPostAlterTypeCleanup(wqueue, tab, lockmode);
 
-			relation_close(rel, NoLock);
+			if (tab->rel)
+			{
+				relation_close(tab->rel, NoLock);
+				tab->rel = NULL;
+			}
 		}
 	}
 
@@ -4562,11 +4587,12 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
  * ATExecCmd: dispatch a subcommand to appropriate execution routine
  */
 static void
-ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ATExecCmd(List **wqueue, AlteredTableInfo *tab,
 		  AlterTableCmd *cmd, LOCKMODE lockmode, int cur_pass,
 		  AlterTableUtilityContext *context)
 {
 	ObjectAddress address = InvalidObjectAddress;
+	Relation	rel = tab->rel;
 
 	switch (cmd->subtype)
 	{
@@ -4853,7 +4879,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 			Assert(cmd != NULL);
 			/* ATPrepCmd ensures it must be a table */
 			Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
-			ATExecDetachPartition(rel, ((PartitionCmd *) cmd->def)->name);
+			ATExecDetachPartition(wqueue, tab, rel,
+								  ((PartitionCmd *) cmd->def)->name,
+								  ((PartitionCmd *) cmd->def)->concurrent);
+			break;
+		case AT_DetachPartitionFinalize:
+			ATExecDetachPartitionFinalize(rel, ((PartitionCmd *) cmd->def)->name);
 			break;
 		case AT_AlterCollationRefreshVersion:
 			/* ATPrepCmd ensured it must be an index */
@@ -5669,6 +5700,7 @@ ATGetQueueEntry(List **wqueue, Relation rel)
 	 */
 	tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
 	tab->relid = relid;
+	tab->rel = NULL;			/* set later */
 	tab->relkind = rel->rd_rel->relkind;
 	tab->oldDesc = CreateTupleDescCopyConstr(RelationGetDescr(rel));
 	tab->newrelpersistence = RELPERSISTENCE_PERMANENT;
@@ -6284,7 +6316,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 */
 	if (colDef->identity &&
 		recurse &&
-		find_inheritance_children(myrelid, NoLock) != NIL)
+		find_inheritance_children(myrelid, false, NoLock) != NIL)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("cannot recursively add identity column to table that has child tables")));
@@ -6517,7 +6549,8 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	/*
 	 * If we are told not to recurse, there had better not be any child
@@ -6671,7 +6704,7 @@ ATPrepDropNotNull(Relation rel, bool recurse, bool recursing)
 	 */
 	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(rel, false);
 
 		Assert(partdesc != NULL);
 		if (partdesc->nparts > 0 && !recurse && !recursing)
@@ -6771,17 +6804,22 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
 	if (rel->rd_rel->relispartition)
 	{
 		Oid			parentId = get_partition_parent(RelationGetRelid(rel));
-		Relation	parent = table_open(parentId, AccessShareLock);
-		TupleDesc	tupDesc = RelationGetDescr(parent);
+		Relation	parent;
+		TupleDesc	tupDesc;
 		AttrNumber	parent_attnum;
 
-		parent_attnum = get_attnum(parentId, colName);
-		if (TupleDescAttr(tupDesc, parent_attnum - 1)->attnotnull)
-			ereport(ERROR,
-					(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-					 errmsg("column \"%s\" is marked NOT NULL in parent table",
-							colName)));
-		table_close(parent, AccessShareLock);
+		if (parentId != InvalidOid)
+		{
+			parent = table_open(parentId, AccessShareLock);
+			tupDesc = RelationGetDescr(parent);
+			parent_attnum = get_attnum(parentId, colName);
+			if (TupleDescAttr(tupDesc, parent_attnum - 1)->attnotnull)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+						 errmsg("column \"%s\" is marked NOT NULL in parent table",
+								colName)));
+			table_close(parent, AccessShareLock);
+		}
 	}
 
 	/*
@@ -7380,7 +7418,7 @@ ATPrepDropExpression(Relation rel, AlterTableCmd *cmd, bool recurse, bool recurs
 	 * resulting state can be properly dumped and restored.
 	 */
 	if (!recurse &&
-		find_inheritance_children(RelationGetRelid(rel), lockmode))
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("ALTER TABLE / DROP EXPRESSION must be applied to child tables too")));
@@ -7963,7 +8001,8 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	if (children)
 	{
@@ -8427,7 +8466,8 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
-	children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+	children =
+		find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 
 	/*
 	 * Check if ONLY was specified with ALTER TABLE.  If so, allow the
@@ -9042,7 +9082,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
 	 */
 	if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(pkrel);
+		PartitionDesc pd = RelationGetPartitionDesc(pkrel, false);
 
 		for (int i = 0; i < pd->nparts; i++)
 		{
@@ -9176,7 +9216,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
 	}
 	else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc pd = RelationGetPartitionDesc(rel);
+		PartitionDesc pd = RelationGetPartitionDesc(rel, false);
 
 		/*
 		 * Recurse to take appropriate action on each partition; either we
@@ -10960,7 +11000,8 @@ ATExecDropConstraint(Relation rel, const char *constrName,
 	 * use find_all_inheritors to do it in one pass.
 	 */
 	if (!is_no_inherit_constraint)
-		children = find_inheritance_children(RelationGetRelid(rel), lockmode);
+		children =
+			find_inheritance_children(RelationGetRelid(rel), false, lockmode);
 	else
 		children = NIL;
 
@@ -11344,7 +11385,8 @@ ATPrepAlterColumnType(List **wqueue,
 		}
 	}
 	else if (!recursing &&
-			 find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
+			 find_inheritance_children(RelationGetRelid(rel), false,
+									   NoLock) != NIL)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("type of inherited column \"%s\" must be changed in child tables too",
@@ -14165,7 +14207,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
 	 */
 
 	/* Off to RemoveInheritance() where most of the work happens */
-	RemoveInheritance(rel, parent_rel);
+	RemoveInheritance(rel, parent_rel, false);
 
 	ObjectAddressSet(address, RelationRelationId,
 					 RelationGetRelid(parent_rel));
@@ -14176,12 +14218,70 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
 	return address;
 }
 
+/*
+ * MarkInheritDetached
+ *
+ * When a partition is detached from its parent concurrently, we don't
+ * remove the pg_inherits row until a second transaction; as a preparatory
+ * step, this function marks the entry as 'detached', so that other
+ * transactions know to ignore the partition.
+ */
+static void
+MarkInheritDetached(Relation child_rel, Relation parent_rel)
+{
+	Relation	catalogRelation;
+	SysScanDesc	scan;
+	ScanKeyData key;
+	HeapTuple	inheritsTuple;
+	bool		found = false;
+
+	/*
+	 * Find pg_inherits entries by inhrelid.
+	 */
+	catalogRelation = table_open(InheritsRelationId, RowExclusiveLock);
+	ScanKeyInit(&key,
+				Anum_pg_inherits_inhrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(RelationGetRelid(child_rel)));
+	scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId,
+							  true, NULL, 1, &key);
+
+	while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+	{
+		HeapTuple	newtup;
+
+		if (((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent !=
+			RelationGetRelid(parent_rel))
+			elog(ERROR, "bad parent tuple found for partition %u",
+				 RelationGetRelid(child_rel));
+
+		newtup = heap_copytuple(inheritsTuple);
+		((Form_pg_inherits) GETSTRUCT(newtup))->inhdetached = true;
+
+		CatalogTupleUpdate(catalogRelation,
+						   &inheritsTuple->t_self,
+						   newtup);
+		found = true;
+	}
+
+	/* Done */
+	systable_endscan(scan);
+	table_close(catalogRelation, RowExclusiveLock);
+
+	if (!found)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("relation \"%s\" is not a partition of relation \"%s\"",
+						RelationGetRelationName(child_rel),
+						RelationGetRelationName(parent_rel))));
+}
+
 /*
  * RemoveInheritance
  *
  * Drop a parent from the child's parents. This just adjusts the attinhcount
  * and attislocal of the columns and removes the pg_inherit and pg_depend
- * entries.
+ * entries.  expect_detached is passed down to DeleteInheritsTuple, q.v..
  *
  * If attinhcount goes to 0 then attislocal gets set to true. If it goes back
  * up attislocal stays true, which means if a child is ever removed from a
@@ -14195,7 +14295,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
  * Common to ATExecDropInherit() and ATExecDetachPartition().
  */
 static void
-RemoveInheritance(Relation child_rel, Relation parent_rel)
+RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached)
 {
 	Relation	catalogRelation;
 	SysScanDesc scan;
@@ -14211,7 +14311,9 @@ RemoveInheritance(Relation child_rel, Relation parent_rel)
 		child_is_partition = true;
 
 	found = DeleteInheritsTuple(RelationGetRelid(child_rel),
-								RelationGetRelid(parent_rel));
+								RelationGetRelid(parent_rel),
+								expect_detached,
+								RelationGetRelationName(child_rel));
 	if (!found)
 	{
 		if (child_is_partition)
@@ -16296,7 +16398,7 @@ QueuePartitionConstraintValidation(List **wqueue, Relation scanrel,
 	}
 	else if (scanrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(scanrel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(scanrel, false);
 		int			i;
 
 		for (i = 0; i < partdesc->nparts; i++)
@@ -16356,7 +16458,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd,
 	 * new partition will change its partition constraint.
 	 */
 	defaultPartOid =
-		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel));
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, false));
 	if (OidIsValid(defaultPartOid))
 		LockRelationOid(defaultPartOid, AccessExclusiveLock);
 
@@ -16945,105 +17047,230 @@ CloneRowTriggersToPartition(Relation parent, Relation partition)
  * ALTER TABLE DETACH PARTITION
  *
  * Return the address of the relation that is no longer a partition of rel.
+ *
+ * If concurrent mode is requested, we run in two transactions.  A side-
+ * effect is that this command cannot run in a multi-part ALTER TABLE.
+ * Currently, that's enforced by the grammar.
+ *
+ * The strategy for concurrency is to first modify the partition catalog
+ * rows to make it visible to everyone that the partition is detached,
+ * lock the partition against writes, and commit the transaction; anyone
+ * who requests the partition descriptor from that point onwards has to
+ * ignore such a partition.  In a second transaction, we wait until all
+ * transactions that could have seen the partition as attached are gone,
+ * then we remove the rest of partition metadata (pg_inherits and
+ * pg_class.relpartbounds).
  */
 static ObjectAddress
-ATExecDetachPartition(Relation rel, RangeVar *name)
+ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab, Relation rel,
+					  RangeVar *name, bool concurrent)
 {
-	Relation	partRel,
-				classRel;
-	HeapTuple	tuple,
-				newtuple;
-	Datum		new_val[Natts_pg_class];
-	bool		new_null[Natts_pg_class],
-				new_repl[Natts_pg_class];
+	Relation	partRel;
 	ObjectAddress address;
 	Oid			defaultPartOid;
-	List	   *indexes;
-	List	   *fks;
-	ListCell   *cell;
 
 	/*
 	 * We must lock the default partition, because detaching this partition
 	 * will change its partition constraint.
 	 */
 	defaultPartOid =
-		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel));
-	if (OidIsValid(defaultPartOid))
-		LockRelationOid(defaultPartOid, AccessExclusiveLock);
-
-	partRel = table_openrv(name, ShareUpdateExclusiveLock);
-
-	/* Ensure that foreign keys still hold after this detach */
-	ATDetachCheckNoForeignKeyRefs(partRel);
-
-	/* All inheritance related checks are performed within the function */
-	RemoveInheritance(partRel, rel);
-
-	/* Update pg_class tuple */
-	classRel = table_open(RelationRelationId, RowExclusiveLock);
-	tuple = SearchSysCacheCopy1(RELOID,
-								ObjectIdGetDatum(RelationGetRelid(partRel)));
-	if (!HeapTupleIsValid(tuple))
-		elog(ERROR, "cache lookup failed for relation %u",
-			 RelationGetRelid(partRel));
-	Assert(((Form_pg_class) GETSTRUCT(tuple))->relispartition);
-
-	/* Clear relpartbound and reset relispartition */
-	memset(new_val, 0, sizeof(new_val));
-	memset(new_null, false, sizeof(new_null));
-	memset(new_repl, false, sizeof(new_repl));
-	new_val[Anum_pg_class_relpartbound - 1] = (Datum) 0;
-	new_null[Anum_pg_class_relpartbound - 1] = true;
-	new_repl[Anum_pg_class_relpartbound - 1] = true;
-	newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
-								 new_val, new_null, new_repl);
-
-	((Form_pg_class) GETSTRUCT(newtuple))->relispartition = false;
-	CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
-	heap_freetuple(newtuple);
-
+		get_default_oid_from_partdesc(RelationGetPartitionDesc(rel, false));
 	if (OidIsValid(defaultPartOid))
 	{
 		/*
-		 * If the relation being detached is the default partition itself,
-		 * remove it from the parent's pg_partitioned_table entry.
+		 * Concurrent detaching when a default partition exists is not
+		 * supported. The main problem is that the default partition
+		 * constraint would change.  And there's a definitional problem: what
+		 * should happen to the tuples that are being inserted that belong to
+		 * the partition being detached?  Putting them on the partition being
+		 * detached would be wrong, since they'd become "lost" after the but
+		 * we cannot put them in the default partition either until we alter
+		 * its partition constraint.
 		 *
-		 * If not, we must invalidate default partition's relcache entry, as
-		 * in StorePartitionBound: its partition constraint depends on every
-		 * other partition's partition constraint.
+		 * I think we could solve this problem if we effected the constraint
+		 * change before committing the first transaction.  But the lock would
+		 * have to remain AEL and it would cause concurrent query planning to
+		 * be blocked, so changing it that way would be even worse.
 		 */
-		if (RelationGetRelid(partRel) == defaultPartOid)
-			update_default_partition_oid(RelationGetRelid(rel), InvalidOid);
-		else
-			CacheInvalidateRelcacheByRelid(defaultPartOid);
+		if (concurrent)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot detach partitions concurrently when a default partition exists")));
+		LockRelationOid(defaultPartOid, AccessExclusiveLock);
 	}
 
-	/* detach indexes too */
-	indexes = RelationGetIndexList(partRel);
-	foreach(cell, indexes)
+	/*
+	 * In concurrent mode, the partition is locked with share-update-exclusive
+	 * in the first transaction.  This allows concurrent transactions to be
+	 * doing DML to the partition.
+	 */
+	partRel = table_openrv(name, concurrent ? ShareLock :
+						   AccessExclusiveLock);
+
+	/*
+	 * Check inheritance conditions and either delete the pg_inherits row
+	 * (in non-concurrent mode) or just set the inhisdetached flag.
+	 */
+	if (!concurrent)
+		RemoveInheritance(partRel, rel, false);
+	else
+		MarkInheritDetached(partRel, rel);
+
+	/*
+	 * Ensure that foreign keys still hold after this detach.  This keeps
+	 * locks on the referencing tables, which prevents concurrent transactions
+	 * from adding rows that we wouldn't see.  For this to work in concurrent
+	 * mode, it is critical that the partition appears as no longer attached
+	 * for the RI queries as soon as the first transaction commits.
+	 */
+	ATDetachCheckNoForeignKeyRefs(partRel);
+
+	/*
+	 * Concurrent mode has to work harder; first we add a new constraint to the
+	 * partition that matches the partition constraint, if there isn't a matching
+	 * one already.  The reason for this is that the planner may have made
+	 * optimizations that depend on the constraint.  XXX Isn't it sufficient to
+	 * invalidate the partition's relcache entry?
+	 *
+	 * Then we close our existing transaction, and in a new one wait for
+	 * all processes to catch up on the catalog updates we've done so far; at
+	 * that point we can complete the operation.
+	 */
+	if (concurrent)
 	{
-		Oid			idxid = lfirst_oid(cell);
-		Relation	idx;
-		Oid			constrOid;
+		Oid		partrelid,
+				parentrelid;
+		LOCKTAG		tag;
+		char   *parentrelname;
+		char   *partrelname;
 
-		if (!has_superclass(idxid))
-			continue;
+		/* Add constraint, if needed. XXX hopefully we can just remove this */
+		DetachAddConstraintIfNeeded(wqueue, partRel);
 
-		Assert((IndexGetRelation(get_partition_parent(idxid), false) ==
-				RelationGetRelid(rel)));
+		/*
+		 * We're almost done now; the only traces that remain are the
+		 * pg_inherits tuple and the partition's relpartbounds.  Before we can
+		 * remove those, we need to wait until all transactions that know that
+		 * this is a partition are gone.
+		 */
 
-		idx = index_open(idxid, AccessExclusiveLock);
-		IndexSetParentIndex(idx, InvalidOid);
+		/*
+		 * Remember relation OIDs to re-acquire them later; and relation names
+		 * too, for error messages if something is dropped in between.
+		 */
+		partrelid = RelationGetRelid(partRel);
+		parentrelid = RelationGetRelid(rel);
+		parentrelname = MemoryContextStrdup(PortalContext,
+											RelationGetRelationName(rel));
+		partrelname = MemoryContextStrdup(PortalContext,
+										  RelationGetRelationName(partRel));
 
-		/* If there's a constraint associated with the index, detach it too */
-		constrOid = get_relation_idx_constraint_oid(RelationGetRelid(partRel),
-													idxid);
-		if (OidIsValid(constrOid))
-			ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+		/* Invalidate relcache entries for the parent -- must be before close */
+		CacheInvalidateRelcache(rel);
+
+		table_close(partRel, NoLock);
+		table_close(rel, NoLock);
+		tab->rel = NULL;
+
+		/* Make updated catalog entry visible */
+		PopActiveSnapshot();
+		CommitTransactionCommand();
+
+		StartTransactionCommand();
+
+		/*
+		 * Now wait.  This ensures that all queries that were planned including
+		 * the partition are finished before we remove the rest of catalog
+		 * entries.  We don't need or indeed want to acquire this lock, though
+		 * -- that would block later queries.
+		 *
+		 * We don't need to concern ourselves with waiting for a lock on the
+		 * partition itself, since we will acquire AccessExclusiveLock below.
+		 */
+		SET_LOCKTAG_RELATION(tag, MyDatabaseId, parentrelid);
+		WaitForLockersMultiple(list_make1(&tag), AccessExclusiveLock, false);
+
+		/*
+		 * Now acquire locks in both relations again.  Note they may have been
+		 * removed in the meantime, so care is required.
+		 */
+		rel = try_relation_open(parentrelid, ShareUpdateExclusiveLock);
+		partRel = try_relation_open(partrelid, AccessExclusiveLock);
+
+		/* If the relations aren't there, something bad happened; bail out */
+		if (rel == NULL)
+		{
+			if (partRel != NULL)	/* shouldn't happen */
+				elog(WARNING, "dangling partition \"%s\" remains, can't fix",
+					 partrelname);
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("partitioned table \"%s\" was removed concurrently",
+							parentrelname)));
+		}
+		if (partRel == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("partition \"%s\" was removed concurrently", partrelname)));
+
+		tab->rel = rel;
 
-		index_close(idx, NoLock);
 	}
-	table_close(classRel, RowExclusiveLock);
+
+	/* Do the final part of detaching */
+	DetachPartitionFinalize(rel, partRel, concurrent, defaultPartOid, false);
+
+	ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partRel));
+
+	/* keep our lock until commit */
+	table_close(partRel, NoLock);
+
+	return address;
+}
+
+/*
+ * Second part of ALTER TABLE .. DETACH.
+ *
+ * This is separate so that it can be run independently when the second
+ * transaction of the concurrent algorithm fails (crash or abort).
+ */
+static void
+DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent,
+						Oid defaultPartOid, bool wait)
+{
+	Relation	classRel;
+	List	   *fks;
+	ListCell   *cell;
+	List	   *indexes;
+	Datum		new_val[Natts_pg_class];
+	bool		new_null[Natts_pg_class],
+				new_repl[Natts_pg_class];
+	HeapTuple	tuple,
+				newtuple;
+
+	/*
+	 * If asked to, wait until existing snapshots are gone.  This is important
+	 * if the second transaction of DETACH PARTITION CONCURRENTLY is canceled:
+	 * the user could immediately run DETACH FINALIZE without actually waiting
+	 * for existing transactions.  We must not complete the detach action until
+	 * all such queries are complete (otherwise we would present them with an
+	 * inconsistent view of catalogs).
+	 */
+	if (wait)
+	{
+		Snapshot	snap = GetActiveSnapshot();
+
+		WaitForOlderSnapshots(snap->xmin, false);
+	}
+
+	if (concurrent)
+	{
+		/*
+		 * We can remove the pg_inherits row now. (In the non-concurrent case,
+		 * this was already done).
+		 */
+		RemoveInheritance(partRel, rel, true);
+	}
 
 	/* Drop any triggers that were cloned on creation/attach. */
 	DropClonedTriggersFromPartition(RelationGetRelid(partRel));
@@ -17116,22 +17343,150 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
 		ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
 		performDeletion(&constraint, DROP_RESTRICT, 0);
 	}
-	CommandCounterIncrement();
+
+	/* Now we can detach indexes */
+	indexes = RelationGetIndexList(partRel);
+	foreach(cell, indexes)
+	{
+		Oid			idxid = lfirst_oid(cell);
+		Relation	idx;
+		Oid			constrOid;
+
+		if (!has_superclass(idxid))
+			continue;
+
+		Assert((IndexGetRelation(get_partition_parent(idxid), false) ==
+				RelationGetRelid(rel)));
+
+		idx = index_open(idxid, AccessExclusiveLock);
+		IndexSetParentIndex(idx, InvalidOid);
+
+		/* If there's a constraint associated with the index, detach it too */
+		constrOid = get_relation_idx_constraint_oid(RelationGetRelid(partRel),
+													idxid);
+		if (OidIsValid(constrOid))
+			ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
+
+		index_close(idx, NoLock);
+	}
+
+	/* Update pg_class tuple */
+	classRel = table_open(RelationRelationId, RowExclusiveLock);
+	tuple = SearchSysCacheCopy1(RELOID,
+								ObjectIdGetDatum(RelationGetRelid(partRel)));
+	if (!HeapTupleIsValid(tuple))
+		elog(ERROR, "cache lookup failed for relation %u",
+			 RelationGetRelid(partRel));
+	Assert(((Form_pg_class) GETSTRUCT(tuple))->relispartition);
+
+	/* Clear relpartbound and reset relispartition */
+	memset(new_val, 0, sizeof(new_val));
+	memset(new_null, false, sizeof(new_null));
+	memset(new_repl, false, sizeof(new_repl));
+	new_val[Anum_pg_class_relpartbound - 1] = (Datum) 0;
+	new_null[Anum_pg_class_relpartbound - 1] = true;
+	new_repl[Anum_pg_class_relpartbound - 1] = true;
+	newtuple = heap_modify_tuple(tuple, RelationGetDescr(classRel),
+								 new_val, new_null, new_repl);
+
+	((Form_pg_class) GETSTRUCT(newtuple))->relispartition = false;
+	CatalogTupleUpdate(classRel, &newtuple->t_self, newtuple);
+	heap_freetuple(newtuple);
+	table_close(classRel, RowExclusiveLock);
+
+	if (OidIsValid(defaultPartOid))
+	{
+		/*
+		 * If the relation being detached is the default partition itself,
+		 * remove it from the parent's pg_partitioned_table entry.
+		 *
+		 * If not, we must invalidate default partition's relcache entry, as
+		 * in StorePartitionBound: its partition constraint depends on every
+		 * other partition's partition constraint.
+		 */
+		if (RelationGetRelid(partRel) == defaultPartOid)
+			update_default_partition_oid(RelationGetRelid(rel), InvalidOid);
+		else
+			CacheInvalidateRelcacheByRelid(defaultPartOid);
+	}
 
 	/*
 	 * Invalidate the parent's relcache so that the partition is no longer
 	 * included in its partition descriptor.
 	 */
 	CacheInvalidateRelcache(rel);
+}
+
+/*
+ * ALTER TABLE ... DETACH PARTITION ... FINALIZE
+ *
+ * To use when a DETACH PARTITION command previously did not run to
+ * completion; this completes the detaching process.
+ */
+static ObjectAddress
+ATExecDetachPartitionFinalize(Relation rel, RangeVar *name)
+{
+	Relation    partRel;
+	ObjectAddress address;
+
+	partRel = table_openrv(name, AccessExclusiveLock);
+
+	DetachPartitionFinalize(rel, partRel, true, InvalidOid, true);
 
 	ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partRel));
 
-	/* keep our lock until commit */
 	table_close(partRel, NoLock);
 
 	return address;
 }
 
+/*
+ * DetachAddConstraintIfNeeded
+ *		Subroutine for ATExecDetachPartition.  Create a constraint that
+ *		takes the place of the partition constraint, but avoid creating
+ *		a dupe if an equivalent constraint already exists.
+ */
+static void
+DetachAddConstraintIfNeeded(List **wqueue, Relation partRel)
+{
+	AlteredTableInfo *tab;
+	Expr	   *constraintExpr;
+	TupleDesc	td = RelationGetDescr(partRel);
+	Constraint *n;
+
+	constraintExpr = make_ands_explicit(RelationGetPartitionQual(partRel));
+
+	/* If an identical constraint exists, we don't need to create one */
+	if (td->constr && td->constr->num_check > 0)
+	{
+		for (int i = 0; i < td->constr->num_check; i++)
+		{
+			Node	*thisconstr;
+
+			thisconstr = stringToNode(td->constr->check[i].ccbin);
+
+			if (equal(constraintExpr, thisconstr))
+				return;
+		}
+	}
+
+	tab = ATGetQueueEntry(wqueue, partRel);
+
+	/* Add constraint on partition, equivalent to the partition constraint */
+	n = makeNode(Constraint);
+	n->contype = CONSTR_CHECK;
+	n->conname = NULL;
+	n->location = -1;
+	n->is_no_inherit = false;
+	n->raw_expr = NULL;
+	n->cooked_expr = nodeToString(constraintExpr);
+	n->initially_valid = true;
+	n->skip_validation = true;
+	/* It's a re-add, since it nominally already exists */
+	ATAddCheckConstraint(wqueue, tab, partRel, n,
+						 true, false, true, ShareUpdateExclusiveLock);
+}
+
 /*
  * DropClonedTriggersFromPartition
  *		subroutine for ATExecDetachPartition to remove any triggers that were
@@ -17327,7 +17682,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
 							   RelationGetRelationName(partIdx))));
 
 		/* Make sure it indexes a partition of the other index's table */
-		partDesc = RelationGetPartitionDesc(parentTbl);
+		partDesc = RelationGetPartitionDesc(parentTbl, false);
 		found = false;
 		for (i = 0; i < partDesc->nparts; i++)
 		{
@@ -17481,7 +17836,7 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl)
 	 * If we found as many inherited indexes as the partitioned table has
 	 * partitions, we're good; update pg_index to set indisvalid.
 	 */
-	if (tuples == RelationGetPartitionDesc(partedTbl)->nparts)
+	if (tuples == RelationGetPartitionDesc(partedTbl, false)->nparts)
 	{
 		Relation	idxRel;
 		HeapTuple	newtup;
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 4e4e05844c..7383d5994e 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -1119,7 +1119,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
 	 */
 	if (partition_recurse)
 	{
-		PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+		PartitionDesc partdesc = RelationGetPartitionDesc(rel, false);
 		List	   *idxs = NIL;
 		List	   *childTbls = NIL;
 		ListCell   *l;
@@ -1141,7 +1141,8 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
 			ListCell   *l;
 			List	   *idxs = NIL;
 
-			idxs = find_inheritance_children(indexOid, ShareRowExclusiveLock);
+			idxs = find_inheritance_children(indexOid, false,
+											 ShareRowExclusiveLock);
 			foreach(l, idxs)
 				childTbls = lappend_oid(childTbls,
 										IndexGetRelation(lfirst_oid(l),
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index b8da4c5967..619aaffae4 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -569,6 +569,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
 					  int partidx)
 {
 	ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
+	Oid			partOid = dispatch->partdesc->oids[partidx];
 	Relation	partrel;
 	int			firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
 	Relation	firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
@@ -579,7 +580,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
 
 	oldcxt = MemoryContextSwitchTo(proute->memcxt);
 
-	partrel = table_open(dispatch->partdesc->oids[partidx], RowExclusiveLock);
+	partrel = table_open(partOid, RowExclusiveLock);
 
 	leaf_part_rri = makeNode(ResultRelInfo);
 	InitResultRelInfo(leaf_part_rri,
@@ -1065,9 +1066,21 @@ ExecInitPartitionDispatchInfo(EState *estate,
 	int			dispatchidx;
 	MemoryContext oldcxt;
 
+	/*
+	 * For data modification, it is better that executor does not include
+	 * partitions being detached, except in snapshot-isolation mode.  This
+	 * means that a read-committed transaction immediately gets a "no
+	 * partition for tuple" error when a tuple is inserted into a partition
+	 * that's being detached concurrently, but a transaction in repeatable-
+	 * read mode can still use the partition.  Note that because partition
+	 * detach uses ShareLock on the partition (which conflicts with DML),
+	 * we're certain that the detach won't be able to complete until any
+	 * inserting transaction is done.
+	 */
 	if (estate->es_partition_directory == NULL)
 		estate->es_partition_directory =
-			CreatePartitionDirectory(estate->es_query_cxt);
+			CreatePartitionDirectory(estate->es_query_cxt,
+									 IsolationUsesXactSnapshot());
 
 	oldcxt = MemoryContextSwitchTo(proute->memcxt);
 
@@ -1645,9 +1658,10 @@ ExecCreatePartitionPruneState(PlanState *planstate,
 	ListCell   *lc;
 	int			i;
 
+	/* Executor must always include detached partitions */
 	if (estate->es_partition_directory == NULL)
 		estate->es_partition_directory =
-			CreatePartitionDirectory(estate->es_query_cxt);
+			CreatePartitionDirectory(estate->es_query_cxt, true);
 
 	n_part_hierarchies = list_length(partitionpruneinfo->prune_infos);
 	Assert(n_part_hierarchies > 0);
@@ -1713,9 +1727,12 @@ ExecCreatePartitionPruneState(PlanState *planstate,
 												partrel);
 
 			/*
-			 * Initialize the subplan_map and subpart_map.  Since detaching a
-			 * partition requires AccessExclusiveLock, no partitions can have
-			 * disappeared, nor can the bounds for any partition have changed.
+			 * Initialize the subplan_map and subpart_map.
+			 *
+			 * Because we request detached partitions to be included, and
+			 * detaching waits for old transactions, it is safe to assume that
+			 * no partitions have disappeared since this query was planned.
+			 *
 			 * However, new partitions may have been added.
 			 */
 			Assert(partdesc->nparts >= pinfo->nparts);
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index da91cbd2b1..850e50f1a2 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4735,6 +4735,7 @@ _copyPartitionCmd(const PartitionCmd *from)
 
 	COPY_NODE_FIELD(name);
 	COPY_NODE_FIELD(bound);
+	COPY_SCALAR_FIELD(concurrent);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index c2d73626fc..250a23b347 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2972,6 +2972,7 @@ _equalPartitionCmd(const PartitionCmd *a, const PartitionCmd *b)
 {
 	COMPARE_NODE_FIELD(name);
 	COMPARE_NODE_FIELD(bound);
+	COMPARE_SCALAR_FIELD(concurrent);
 
 	return true;
 }
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 7e25f94293..830341d08e 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -1192,11 +1192,15 @@ target_rel_partitions_max_parallel_hazard(Relation rel,
 
 	/* Recursively check each partition ... */
 
-	/* Create the PartitionDirectory infrastructure if we didn't already */
+	/*
+	 * Create the PartitionDirectory infrastructure if we didn't already. Must
+	 * match args used in set_relation_partition_info.
+	 */
 	glob = context->planner_global;
 	if (glob->partition_directory == NULL)
 		glob->partition_directory =
-			CreatePartitionDirectory(CurrentMemoryContext);
+			CreatePartitionDirectory(CurrentMemoryContext,
+									 IsolationUsesXactSnapshot());
 
 	pdesc = PartitionDirectoryLookup(glob->partition_directory, rel);
 
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index c5947fa418..9ec7d7f518 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -2142,10 +2142,17 @@ set_relation_partition_info(PlannerInfo *root, RelOptInfo *rel,
 {
 	PartitionDesc partdesc;
 
-	/* Create the PartitionDirectory infrastructure if we didn't already */
+	/*
+	 * Create the PartitionDirectory infrastructure if we didn't already.
+	 * In snapshot-isolation transactions, we always include detached
+	 * partitions, because otherwise they become invisible too soon.
+	 */
 	if (root->glob->partition_directory == NULL)
+	{
 		root->glob->partition_directory =
-			CreatePartitionDirectory(CurrentMemoryContext);
+			CreatePartitionDirectory(CurrentMemoryContext,
+									 IsolationUsesXactSnapshot());
+	}
 
 	partdesc = PartitionDirectoryLookup(root->glob->partition_directory,
 										relation);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 652be0b96d..b26d0a3d0b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -646,7 +646,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPRESSION
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -2096,12 +2096,13 @@ partition_cmd:
 					n->subtype = AT_AttachPartition;
 					cmd->name = $3;
 					cmd->bound = $4;
+					cmd->concurrent = false;
 					n->def = (Node *) cmd;
 
 					$$ = (Node *) n;
 				}
-			/* ALTER TABLE <name> DETACH PARTITION <partition_name> */
-			| DETACH PARTITION qualified_name
+			/* ALTER TABLE <name> DETACH PARTITION <partition_name> [CONCURRENTLY] */
+			| DETACH PARTITION qualified_name opt_concurrently
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
 					PartitionCmd *cmd = makeNode(PartitionCmd);
@@ -2109,8 +2110,21 @@ partition_cmd:
 					n->subtype = AT_DetachPartition;
 					cmd->name = $3;
 					cmd->bound = NULL;
+					cmd->concurrent = $4;
 					n->def = (Node *) cmd;
 
+					$$ = (Node *) n;
+				}
+			| DETACH PARTITION qualified_name FINALIZE
+				{
+					AlterTableCmd *n = makeNode(AlterTableCmd);
+					PartitionCmd *cmd = makeNode(PartitionCmd);
+
+					n->subtype = AT_DetachPartitionFinalize;
+					cmd->name = $3;
+					cmd->bound = NULL;
+					cmd->concurrent = false;
+					n->def = (Node *) cmd;
 					$$ = (Node *) n;
 				}
 		;
@@ -2125,6 +2139,7 @@ index_partition_cmd:
 					n->subtype = AT_AttachPartition;
 					cmd->name = $3;
 					cmd->bound = NULL;
+					cmd->concurrent = false;
 					n->def = (Node *) cmd;
 
 					$$ = (Node *) n;
@@ -15348,6 +15363,7 @@ unreserved_keyword:
 			| EXTERNAL
 			| FAMILY
 			| FILTER
+			| FINALIZE
 			| FIRST_P
 			| FOLLOWING
 			| FORCE
@@ -15888,6 +15904,7 @@ bare_label_keyword:
 			| EXTRACT
 			| FALSE_P
 			| FAMILY
+			| FINALIZE
 			| FIRST_P
 			| FLOAT_P
 			| FOLLOWING
diff --git a/src/backend/partitioning/partbounds.c b/src/backend/partitioning/partbounds.c
index e5f3482d52..2b2b1dc1ad 100644
--- a/src/backend/partitioning/partbounds.c
+++ b/src/backend/partitioning/partbounds.c
@@ -2798,7 +2798,7 @@ check_new_partition_bound(char *relname, Relation parent,
 						  PartitionBoundSpec *spec, ParseState *pstate)
 {
 	PartitionKey key = RelationGetPartitionKey(parent);
-	PartitionDesc partdesc = RelationGetPartitionDesc(parent);
+	PartitionDesc partdesc = RelationGetPartitionDesc(parent, true);
 	PartitionBoundInfo boundinfo = partdesc->boundinfo;
 	int			with = -1;
 	bool		overlap = false;
@@ -3990,7 +3990,7 @@ get_qual_for_list(Relation parent, PartitionBoundSpec *spec)
 	{
 		int			i;
 		int			ndatums = 0;
-		PartitionDesc pdesc = RelationGetPartitionDesc(parent);
+		PartitionDesc pdesc = RelationGetPartitionDesc(parent, true);	/* XXX correct? */
 		PartitionBoundInfo boundinfo = pdesc->boundinfo;
 
 		if (boundinfo)
@@ -4190,7 +4190,7 @@ get_qual_for_range(Relation parent, PartitionBoundSpec *spec,
 	if (spec->is_default)
 	{
 		List	   *or_expr_args = NIL;
-		PartitionDesc pdesc = RelationGetPartitionDesc(parent);
+		PartitionDesc pdesc = RelationGetPartitionDesc(parent, true);	/* XXX correct? */
 		Oid		   *inhoids = pdesc->oids;
 		int			nparts = pdesc->nparts,
 					i;
diff --git a/src/backend/partitioning/partdesc.c b/src/backend/partitioning/partdesc.c
index f852b6e99d..58570fecfd 100644
--- a/src/backend/partitioning/partdesc.c
+++ b/src/backend/partitioning/partdesc.c
@@ -37,6 +37,7 @@ typedef struct PartitionDirectoryData
 {
 	MemoryContext pdir_mcxt;
 	HTAB	   *pdir_hash;
+	bool		include_detached;
 }			PartitionDirectoryData;
 
 typedef struct PartitionDirectoryEntry
@@ -46,7 +47,7 @@ typedef struct PartitionDirectoryEntry
 	PartitionDesc pd;
 } PartitionDirectoryEntry;
 
-static void RelationBuildPartitionDesc(Relation rel);
+static void RelationBuildPartitionDesc(Relation rel, bool include_detached);
 
 
 /*
@@ -61,13 +62,14 @@ static void RelationBuildPartitionDesc(Relation rel);
  * that the data doesn't become stale.
  */
 PartitionDesc
-RelationGetPartitionDesc(Relation rel)
+RelationGetPartitionDesc(Relation rel, bool include_detached)
 {
 	if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		return NULL;
 
-	if (unlikely(rel->rd_partdesc == NULL))
-		RelationBuildPartitionDesc(rel);
+	if (unlikely(rel->rd_partdesc == NULL ||
+				 rel->rd_partdesc->includes_detached != include_detached))
+		RelationBuildPartitionDesc(rel, include_detached);
 
 	return rel->rd_partdesc;
 }
@@ -88,7 +90,7 @@ RelationGetPartitionDesc(Relation rel)
  * permanently.
  */
 static void
-RelationBuildPartitionDesc(Relation rel)
+RelationBuildPartitionDesc(Relation rel, bool include_detached)
 {
 	PartitionDesc partdesc;
 	PartitionBoundInfo boundinfo = NULL;
@@ -110,7 +112,8 @@ RelationBuildPartitionDesc(Relation rel)
 	 * concurrently, whatever this function returns will be accurate as of
 	 * some well-defined point in time.
 	 */
-	inhoids = find_inheritance_children(RelationGetRelid(rel), NoLock);
+	inhoids = find_inheritance_children(RelationGetRelid(rel), include_detached,
+										NoLock);
 	nparts = list_length(inhoids);
 
 	/* Allocate working arrays for OIDs, leaf flags, and boundspecs. */
@@ -238,6 +241,7 @@ RelationBuildPartitionDesc(Relation rel)
 		partdesc->boundinfo = partition_bounds_copy(boundinfo, key);
 		partdesc->oids = (Oid *) palloc(nparts * sizeof(Oid));
 		partdesc->is_leaf = (bool *) palloc(nparts * sizeof(bool));
+		partdesc->includes_detached = include_detached;
 
 		/*
 		 * Assign OIDs from the original array into mapped indexes of the
@@ -280,7 +284,7 @@ RelationBuildPartitionDesc(Relation rel)
  *		Create a new partition directory object.
  */
 PartitionDirectory
-CreatePartitionDirectory(MemoryContext mcxt)
+CreatePartitionDirectory(MemoryContext mcxt, bool include_detached)
 {
 	MemoryContext oldcontext = MemoryContextSwitchTo(mcxt);
 	PartitionDirectory pdir;
@@ -295,6 +299,7 @@ CreatePartitionDirectory(MemoryContext mcxt)
 
 	pdir->pdir_hash = hash_create("partition directory", 256, &ctl,
 								  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	pdir->include_detached = include_detached;
 
 	MemoryContextSwitchTo(oldcontext);
 	return pdir;
@@ -327,7 +332,7 @@ PartitionDirectoryLookup(PartitionDirectory pdir, Relation rel)
 		 */
 		RelationIncrementReferenceCount(rel);
 		pde->rel = rel;
-		pde->pd = RelationGetPartitionDesc(rel);
+		pde->pd = RelationGetPartitionDesc(rel, pdir->include_detached);
 		Assert(pde->pd != NULL);
 	}
 	return pde->pd;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 05bb698cf4..729274b330 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1236,6 +1236,25 @@ ProcessUtilitySlow(ParseState *pstate,
 					AlterTableStmt *atstmt = (AlterTableStmt *) parsetree;
 					Oid			relid;
 					LOCKMODE	lockmode;
+					ListCell   *cell;
+
+					/*
+					 * Disallow ALTER TABLE .. DETACH CONCURRENTLY in a
+					 * transaction block or function.  (Perhaps it could be
+					 * allowed in a procedure, but don't hold your breath.)
+					 */
+					foreach(cell, atstmt->cmds)
+					{
+						AlterTableCmd *cmd = (AlterTableCmd *) lfirst(cell);
+
+						/* Disallow DETACH CONCURRENTLY in a transaction block */
+						if (cmd->subtype == AT_DetachPartition)
+						{
+							if (((PartitionCmd *) cmd->def)->concurrent)
+								PreventInTransactionBlock(isTopLevel,
+														  "ALTER TABLE ... DETACH CONCURRENTLY");
+						}
+					}
 
 					/*
 					 * Figure out lock mode, and acquire lock.  This also does
diff --git a/src/backend/utils/cache/partcache.c b/src/backend/utils/cache/partcache.c
index a6388d980e..74d85b89fb 100644
--- a/src/backend/utils/cache/partcache.c
+++ b/src/backend/utils/cache/partcache.c
@@ -341,6 +341,7 @@ generate_partition_qual(Relation rel)
 	bool		isnull;
 	List	   *my_qual = NIL,
 			   *result = NIL;
+	Oid			parentrelid;
 	Relation	parent;
 
 	/* Guard against stack overflow due to overly deep partition tree */
@@ -350,9 +351,19 @@ generate_partition_qual(Relation rel)
 	if (rel->rd_partcheckvalid)
 		return copyObject(rel->rd_partcheck);
 
+	/*
+	 * Obtain parent relid; if it's invalid, then the partition is being
+	 * detached
+	 */
+	parentrelid = get_partition_parent(RelationGetRelid(rel));
+	if (parentrelid == InvalidOid)
+	{
+		rel->rd_partcheckvalid = true;
+		return NIL;
+	}
+
 	/* Grab at least an AccessShareLock on the parent table */
-	parent = relation_open(get_partition_parent(RelationGetRelid(rel)),
-						   AccessShareLock);
+	parent = relation_open(parentrelid, AccessShareLock);
 
 	/* Get pg_class.relpartbound */
 	tuple = SearchSysCache1(RELOID, RelationGetRelid(rel));
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 20af5a92b4..a8d64014f0 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2117,7 +2117,12 @@ describeOneTableDetails(const char *schemaname,
 
 		printfPQExpBuffer(&buf,
 						  "SELECT inhparent::pg_catalog.regclass,\n"
-						  "  pg_catalog.pg_get_expr(c.relpartbound, c.oid)");
+						  "  pg_catalog.pg_get_expr(c.relpartbound, c.oid),\n  ");
+
+		appendPQExpBuffer(&buf,
+						  pset.sversion >= 140000 ? "inhdetached" :
+						  "false as inhdetached");
+
 		/* If verbose, also request the partition constraint definition */
 		if (verbose)
 			appendPQExpBufferStr(&buf,
@@ -2135,17 +2140,19 @@ describeOneTableDetails(const char *schemaname,
 		{
 			char	   *parent_name = PQgetvalue(result, 0, 0);
 			char	   *partdef = PQgetvalue(result, 0, 1);
+			char	   *detached = PQgetvalue(result, 0, 2);
 
-			printfPQExpBuffer(&tmpbuf, _("Partition of: %s %s"), parent_name,
-							  partdef);
+			printfPQExpBuffer(&tmpbuf, _("Partition of: %s %s%s"), parent_name,
+							  partdef,
+							  strcmp(detached, "t") == 0 ? " DETACHED" : "");
 			printTableAddFooter(&cont, tmpbuf.data);
 
 			if (verbose)
 			{
 				char	   *partconstraintdef = NULL;
 
-				if (!PQgetisnull(result, 0, 2))
-					partconstraintdef = PQgetvalue(result, 0, 2);
+				if (!PQgetisnull(result, 0, 3))
+					partconstraintdef = PQgetvalue(result, 0, 3);
 				/* If there isn't any constraint, show that explicitly */
 				if (partconstraintdef == NULL || partconstraintdef[0] == '\0')
 					printfPQExpBuffer(&tmpbuf, _("No partition constraint"));
@@ -3197,9 +3204,18 @@ describeOneTableDetails(const char *schemaname,
 		}
 
 		/* print child tables (with additional info if partitions) */
-		if (pset.sversion >= 100000)
+		if (pset.sversion >= 140000)
 			printfPQExpBuffer(&buf,
-							  "SELECT c.oid::pg_catalog.regclass, c.relkind,"
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind, inhdetached,"
+							  " pg_catalog.pg_get_expr(c.relpartbound, c.oid)\n"
+							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
+							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
+							  "ORDER BY pg_catalog.pg_get_expr(c.relpartbound, c.oid) = 'DEFAULT',"
+							  " c.oid::pg_catalog.regclass::pg_catalog.text;",
+							  oid);
+		else if (pset.sversion >= 100000)
+			printfPQExpBuffer(&buf,
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind, false AS inhdetached,"
 							  " pg_catalog.pg_get_expr(c.relpartbound, c.oid)\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
@@ -3208,14 +3224,14 @@ describeOneTableDetails(const char *schemaname,
 							  oid);
 		else if (pset.sversion >= 80300)
 			printfPQExpBuffer(&buf,
-							  "SELECT c.oid::pg_catalog.regclass, c.relkind, NULL\n"
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind, false AS inhdetached, NULL\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
 							  "ORDER BY c.oid::pg_catalog.regclass::pg_catalog.text;",
 							  oid);
 		else
 			printfPQExpBuffer(&buf,
-							  "SELECT c.oid::pg_catalog.regclass, c.relkind, NULL\n"
+							  "SELECT c.oid::pg_catalog.regclass, c.relkind, false AS inhdetached, NULL\n"
 							  "FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i\n"
 							  "WHERE c.oid = i.inhrelid AND i.inhparent = '%s'\n"
 							  "ORDER BY c.relname;",
@@ -3265,11 +3281,13 @@ describeOneTableDetails(const char *schemaname,
 				else
 					printfPQExpBuffer(&buf, "%*s  %s",
 									  ctw, "", PQgetvalue(result, i, 0));
-				if (!PQgetisnull(result, i, 2))
-					appendPQExpBuffer(&buf, " %s", PQgetvalue(result, i, 2));
+				if (!PQgetisnull(result, i, 3))
+					appendPQExpBuffer(&buf, " %s", PQgetvalue(result, i, 3));
 				if (child_relkind == RELKIND_PARTITIONED_TABLE ||
 					child_relkind == RELKIND_PARTITIONED_INDEX)
 					appendPQExpBufferStr(&buf, ", PARTITIONED");
+				if (strcmp(PQgetvalue(result, i, 2), "t") == 0)
+					appendPQExpBuffer(&buf, " (DETACHED)");
 				if (i < tuples - 1)
 					appendPQExpBufferChar(&buf, ',');
 
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index 2b71cad9a2..155596907c 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -34,6 +34,7 @@ CATALOG(pg_inherits,2611,InheritsRelationId)
 	Oid			inhrelid BKI_LOOKUP(pg_class);
 	Oid			inhparent BKI_LOOKUP(pg_class);
 	int32		inhseqno;
+	bool		inhdetached;
 } FormData_pg_inherits;
 
 /* ----------------
@@ -49,7 +50,8 @@ DECLARE_INDEX(pg_inherits_parent_index, 2187, on pg_inherits using btree(inhpare
 #define InheritsParentIndexId  2187
 
 
-extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode);
+extern List *find_inheritance_children(Oid parentrelId, bool include_detached,
+									   LOCKMODE lockmode);
 extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode,
 								 List **parents);
 extern bool has_subclass(Oid relationId);
@@ -57,6 +59,7 @@ extern bool has_superclass(Oid relationId);
 extern bool typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId);
 extern void StoreSingleInheritance(Oid relationId, Oid parentOid,
 								   int32 seqNumber);
-extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent);
+extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent, bool allow_detached,
+								const char *childname);
 
 #endif							/* PG_INHERITS_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 236832a2ca..f97105ed3a 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -860,6 +860,7 @@ typedef struct PartitionCmd
 	NodeTag		type;
 	RangeVar   *name;			/* name of partition to attach/detach */
 	PartitionBoundSpec *bound;	/* FOR VALUES, if attaching */
+	bool		concurrent;
 } PartitionCmd;
 
 /****************************************************************************
@@ -1896,6 +1897,7 @@ typedef enum AlterTableType
 	AT_GenericOptions,			/* OPTIONS (...) */
 	AT_AttachPartition,			/* ATTACH PARTITION */
 	AT_DetachPartition,			/* DETACH PARTITION */
+	AT_DetachPartitionFinalize,	/* DETACH PARTITION FINALIZE */
 	AT_AddIdentity,				/* ADD IDENTITY */
 	AT_SetIdentity,				/* SET identity column options */
 	AT_DropIdentity,			/* DROP IDENTITY */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 28083aaac9..7da248a385 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -165,6 +165,7 @@ PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL)
+PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/partitioning/partdesc.h b/src/include/partitioning/partdesc.h
index ff113199e5..7f03ff4271 100644
--- a/src/include/partitioning/partdesc.h
+++ b/src/include/partitioning/partdesc.h
@@ -21,6 +21,7 @@
 typedef struct PartitionDescData
 {
 	int			nparts;			/* Number of partitions */
+	bool		includes_detached;	/* Does it include detached partitions */
 	Oid		   *oids;			/* Array of 'nparts' elements containing
 								 * partition OIDs in order of the their bounds */
 	bool	   *is_leaf;		/* Array of 'nparts' elements storing whether
@@ -30,9 +31,9 @@ typedef struct PartitionDescData
 } PartitionDescData;
 
 
-extern PartitionDesc RelationGetPartitionDesc(Relation rel);
+extern PartitionDesc RelationGetPartitionDesc(Relation rel, bool include_detached);
 
-extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt);
+extern PartitionDirectory CreatePartitionDirectory(MemoryContext mcxt, bool include_detached);
 extern PartitionDesc PartitionDirectoryLookup(PartitionDirectory, Relation);
 extern void DestroyPartitionDirectory(PartitionDirectory pdir);
 
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 2c8b881a09..901eccec25 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -133,6 +133,7 @@ extern void AtEOXact_Snapshot(bool isCommit, bool resetXmin);
 extern void ImportSnapshot(const char *idstr);
 extern bool XactHasExportedSnapshots(void);
 extern void DeleteAllExportedSnapshotFiles(void);
+extern void WaitForOlderSnapshots(TransactionId limitXmin, bool progress);
 extern bool ThereAreNoPriorRegisteredSnapshots(void);
 extern bool TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
 												Relation relation,
diff --git a/src/test/isolation/expected/detach-partition-concurrently-1.out b/src/test/isolation/expected/detach-partition-concurrently-1.out
new file mode 100644
index 0000000000..8d15beac60
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-1.out
@@ -0,0 +1,301 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1b s1s s2detach s1s s1c s1s
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+
+starting permutation: s1b s1s s2detach s1s s3s s3i s1c s3i s2drop s1s
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+step s3i: SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2';
+?column?       
+
+f              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3i: SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2';
+?column?       
+
+t              
+step s2drop: DROP TABLE d_listp2;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+
+starting permutation: s1b s1s s2detach s1ins s1s s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1b s1s s1ins2 s2detach s1ins s1s s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1ins2: INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+2              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1s s2detach s1ins s1s s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1ins: INSERT INTO d_listp VALUES (1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+1              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1s s2detach s1s s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1b s1ins2 s2detach s3ins2 s1c s3debugtest
+step s1b: BEGIN;
+step s1ins2: INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3ins2: INSERT INTO d_listp VALUES (2); <waiting ...>
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3ins2: <... completed>
+step s3debugtest: SELECT tableoid::regclass FROM d_listp; SELECT * from d_listp2;
+tableoid       
+
+d_listp1       
+a              
+
+2              
+2              
+2              
+
+starting permutation: s1brr s1prep s1s s2detach s1s s1exec1 s3s s1dealloc s1c
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec1: EXECUTE f(1);
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+step s1dealloc: DEALLOCATE f;
+step s1c: COMMIT;
+step s2detach: <... completed>
+
+starting permutation: s1brr s1prep s1exec2 s2detach s1s s1exec2 s3s s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1exec2: EXECUTE f(2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+2              
+step s1exec2: EXECUTE f(2);
+step s3s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep s1s s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep: PREPARE f(int) AS INSERT INTO d_listp VALUES ($1);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep1 s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep1: PREPARE f(int) AS INSERT INTO d_listp VALUES (1);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1brr s1prep2 s2detach s1s s1exec2 s1c s1dealloc
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1prep2: PREPARE f(int) AS INSERT INTO d_listp VALUES (2);
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s1exec2: EXECUTE f(2);
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s1dealloc: DEALLOCATE f;
+
+starting permutation: s1b s1s s2detach s3drop1 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3drop1: DROP TABLE d_listp; <waiting ...>
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3drop1: <... completed>
+error in steps s1c s2detach s3drop1: ERROR:  partitioned table "d_listp" was removed concurrently
+
+starting permutation: s1b s1s s2detach s3drop2 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3drop2: DROP TABLE d_listp2; <waiting ...>
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3drop2: <... completed>
+error in steps s1c s2detach s3drop2: ERROR:  partition "d_listp2" was removed concurrently
+
+starting permutation: s1b s1s s2detach s3detach s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3detach: ALTER TABLE d_listp DETACH PARTITION d_listp2; <waiting ...>
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3detach: <... completed>
+error in steps s1c s2detach s3detach: ERROR:  cannot detach partition "d_listp2"
+
+starting permutation: s1b s1s s2detach s3rename s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_listp;
+a              
+
+1              
+2              
+step s2detach: ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; <waiting ...>
+step s3rename: ALTER TABLE d_listp2 RENAME TO d_listp_foobar; <waiting ...>
+step s1c: COMMIT;
+step s2detach: <... completed>
+step s3rename: <... completed>
diff --git a/src/test/isolation/expected/detach-partition-concurrently-2.out b/src/test/isolation/expected/detach-partition-concurrently-2.out
new file mode 100644
index 0000000000..85be707b40
--- /dev/null
+++ b/src/test/isolation/expected/detach-partition-concurrently-2.out
@@ -0,0 +1,66 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1b s1s s2d s3i1 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+ERROR:  insert or update on table "d_lp_fk_r" violates foreign key constraint "d_lp_fk_r_a_fkey"
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s2d s3i2 s3i2 s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s3i1 s2d s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY;
+ERROR:  removing partition "d_lp_fk_1" violates foreign key constraint "d_lp_fk_r_a_fkey1"
+step s1c: COMMIT;
+
+starting permutation: s1b s1s s3i2 s2d s1c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3i2: INSERT INTO d_lp_fk_r VALUES (2);
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s1c: COMMIT;
+step s2d: <... completed>
+
+starting permutation: s1b s1s s3b s2d s3i1 s1c s3c
+step s1b: BEGIN;
+step s1s: SELECT * FROM d_lp_fk;
+a              
+
+1              
+2              
+step s3b: BEGIN;
+step s2d: ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; <waiting ...>
+step s3i1: INSERT INTO d_lp_fk_r VALUES (1);
+ERROR:  insert or update on table "d_lp_fk_r" violates foreign key constraint "d_lp_fk_r_a_fkey"
+step s1c: COMMIT;
+step s2d: <... completed>
+step s3c: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 5d6b79e66e..69928dc540 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -24,6 +24,8 @@ test: deadlock-hard
 test: deadlock-soft
 test: deadlock-soft-2
 test: deadlock-parallel
+test: detach-partition-concurrently-1
+test: detach-partition-concurrently-2
 test: fk-contention
 test: fk-deadlock
 test: fk-deadlock2
diff --git a/src/test/isolation/specs/detach-partition-concurrently-1.spec b/src/test/isolation/specs/detach-partition-concurrently-1.spec
new file mode 100644
index 0000000000..1e9bea8198
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-1.spec
@@ -0,0 +1,90 @@
+# Test that detach partition concurrently makes the partition invisible at the
+# correct time.
+
+setup
+{
+  DROP TABLE IF EXISTS d_listp, d_listp1, d_listp2;
+  CREATE TABLE d_listp (a int) PARTITION BY LIST(a);
+  CREATE TABLE d_listp1 PARTITION OF d_listp FOR VALUES IN (1);
+  CREATE TABLE d_listp2 PARTITION OF d_listp FOR VALUES IN (2);
+  INSERT INTO d_listp VALUES (1),(2);
+}
+
+teardown {
+    DROP TABLE IF EXISTS d_listp, d_listp2, d_listp_foobar;
+}
+
+session "s1"
+step "s1b"		{ BEGIN; }
+step "s1brr"	{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1s"		{ SELECT * FROM d_listp; }
+step "s1ins"	{ INSERT INTO d_listp VALUES (1); }
+step "s1ins2"	{ INSERT INTO d_listp VALUES (2); }
+step "s1prep"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES ($1); }
+step "s1prep1"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES (1); }
+step "s1prep2"	{ PREPARE f(int) AS INSERT INTO d_listp VALUES (2); }
+step "s1exec1"	{ EXECUTE f(1); }
+step "s1exec2"	{ EXECUTE f(2); }
+step "s1dealloc" { DEALLOCATE f; }
+step "s1c"		{ COMMIT; }
+
+session "s2"
+step "s2detach"		{ ALTER TABLE d_listp DETACH PARTITION d_listp2 CONCURRENTLY; }
+step "s2drop"	{ DROP TABLE d_listp2; }
+
+session "s3"
+step "s3s"		{ SELECT * FROM d_listp; }
+step "s3i"		{ SELECT relpartbound IS NULL FROM pg_class where relname = 'd_listp2'; }
+step "s3ins2"		{ INSERT INTO d_listp VALUES (2); }
+step "s3drop1"	{ DROP TABLE d_listp; }
+step "s3drop2"	{ DROP TABLE d_listp2; }
+step "s3detach"	{ ALTER TABLE d_listp DETACH PARTITION d_listp2; }
+step "s3rename"	{ ALTER TABLE d_listp2 RENAME TO d_listp_foobar; }
+# XXX remove this step
+step "s3debugtest" { SELECT tableoid::regclass FROM d_listp; SELECT * from d_listp2; }
+
+# The transaction that detaches hangs until it sees any older transaction
+# terminate, as does anybody else.
+permutation "s1b" "s1s" "s2detach" "s1s" "s1c" "s1s"
+
+# relpartbound remains set until s1 commits
+# XXX this could be timing dependent :-(
+permutation "s1b" "s1s" "s2detach" "s1s" "s3s" "s3i" "s1c" "s3i" "s2drop" "s1s"
+
+# In read-committed mode, the partition disappears from view of concurrent
+# transactions immediately.  But if a write lock is held, then the detach
+# has to wait.
+permutation "s1b"   "s1s"          "s2detach" "s1ins" "s1s" "s1c"
+permutation "s1b"   "s1s" "s1ins2" "s2detach" "s1ins" "s1s" "s1c"
+
+# In repeatable-read mode, the partition remains visible until commit even
+# if the to-be-detached partition is not locked for write.
+permutation "s1brr" "s1s"          "s2detach" "s1ins" "s1s" "s1c"
+permutation "s1brr" "s1s"          "s2detach"         "s1s" "s1c"
+
+# Another process trying to acquire a write lock will be blocked behind the
+# detacher
+permutation "s1b" "s1ins2" "s2detach" "s3ins2" "s1c" "s3debugtest"
+
+# a prepared query is not blocked
+permutation "s1brr" "s1prep" "s1s" "s2detach" "s1s" "s1exec1" "s3s" "s1dealloc" "s1c"
+permutation "s1brr" "s1prep" "s1exec2" "s2detach" "s1s" "s1exec2" "s3s" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep" "s1s" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep1" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+permutation "s1brr" "s1prep2" "s2detach" "s1s" "s1exec2" "s1c" "s1dealloc"
+
+
+# Note: the following tests are not very interesting in isolationtester because
+# the way this tool handles multiple sessions: it'll only run the drop/detach/
+# rename commands after s2 is blocked waiting.  It would be more useful to
+# run the DDL when s2 commits its first transaction instead.  Maybe these should
+# become TAP tests someday.
+
+# What happens if the partition or the parent table is dropped while waiting?
+permutation "s1b" "s1s" "s2detach" "s3drop1" "s1c"
+permutation "s1b" "s1s" "s2detach" "s3drop2" "s1c"
+# Also try a non-concurrent detach while the other one is waiting
+permutation "s1b" "s1s" "s2detach" "s3detach" "s1c"
+# and some other DDL
+permutation "s1b" "s1s" "s2detach" "s3rename" "s1c"
diff --git a/src/test/isolation/specs/detach-partition-concurrently-2.spec b/src/test/isolation/specs/detach-partition-concurrently-2.spec
new file mode 100644
index 0000000000..9281c80a69
--- /dev/null
+++ b/src/test/isolation/specs/detach-partition-concurrently-2.spec
@@ -0,0 +1,41 @@
+# Test that detach partition concurrently makes the partition safe
+# for foreign keys that reference it.
+
+setup
+{
+  DROP TABLE IF EXISTS d_lp_fk, d_lp_fk_1, d_lp_fk_2, d_lp_fk_r;
+
+  CREATE TABLE d_lp_fk (a int PRIMARY KEY) PARTITION BY LIST(a);
+  CREATE TABLE d_lp_fk_1 PARTITION OF d_lp_fk FOR VALUES IN (1);
+  CREATE TABLE d_lp_fk_2 PARTITION OF d_lp_fk FOR VALUES IN (2);
+  INSERT INTO d_lp_fk VALUES (1), (2);
+
+  CREATE TABLE d_lp_fk_r (a int references d_lp_fk);
+}
+
+teardown { DROP TABLE IF EXISTS d_lp_fk, d_lp_fk_1, d_lp_fk_2, d_lp_fk_r; }
+
+session "s1"
+step "s1b"		{ BEGIN; }
+step "s1s"		{ SELECT * FROM d_lp_fk; }
+step "s1c"		{ COMMIT; }
+
+session "s2"
+step "s2d"		{ ALTER TABLE d_lp_fk DETACH PARTITION d_lp_fk_1 CONCURRENTLY; }
+
+session "s3"
+step "s3b"		{ BEGIN; }
+step "s3i1"		{ INSERT INTO d_lp_fk_r VALUES (1); }
+step "s3i2"		{ INSERT INTO d_lp_fk_r VALUES (2); }
+step "s3c"		{ COMMIT; }
+
+# The transaction that detaches hangs until it sees any older transaction
+# terminate.
+permutation "s1b" "s1s" "s2d" "s3i1" "s1c"
+permutation "s1b" "s1s" "s2d" "s3i2" "s3i2" "s1c"
+
+permutation "s1b" "s1s" "s3i1" "s2d" "s1c"
+permutation "s1b" "s1s" "s3i2" "s2d" "s1c"
+
+# what if s3 has an uncommitted insertion?
+permutation "s1b" "s1s" "s3b" "s2d" "s3i1" "s1c" "s3c"
diff --git a/src/test/modules/delay_execution/Makefile b/src/test/modules/delay_execution/Makefile
index f270aebf3a..70f24e846d 100644
--- a/src/test/modules/delay_execution/Makefile
+++ b/src/test/modules/delay_execution/Makefile
@@ -7,7 +7,8 @@ OBJS = \
 	$(WIN32RES) \
 	delay_execution.o
 
-ISOLATION = partition-addition
+ISOLATION = partition-addition \
+	    partition-removal-1
 
 ifdef USE_PGXS
 PG_CONFIG = pg_config
diff --git a/src/test/modules/delay_execution/expected/partition-removal-1.out b/src/test/modules/delay_execution/expected/partition-removal-1.out
new file mode 100644
index 0000000000..00b93ecf65
--- /dev/null
+++ b/src/test/modules/delay_execution/expected/partition-removal-1.out
@@ -0,0 +1,79 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s3lock s1b s1exec s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1b: BEGIN;
+step s1exec: SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec: <... completed>
+a              b              
+
+2              JKL            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1brr s1exec s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1exec: SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec: <... completed>
+a              b              
+
+2              JKL            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1b s1exec2 s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1b: BEGIN;
+step s1exec2: SELECT * FROM partrem WHERE a <> (SELECT 2); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec2: <... completed>
+a              b              
+
+1              ABC            
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
+
+starting permutation: s3lock s1brr s1exec2 s2remp s3unlock s1c
+step s3lock: SELECT pg_advisory_lock(12543);
+pg_advisory_lock
+
+               
+step s1brr: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s1exec2: SELECT * FROM partrem WHERE a <> (SELECT 2); <waiting ...>
+step s2remp: ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; <waiting ...>
+step s3unlock: SELECT pg_advisory_unlock(12543);
+pg_advisory_unlock
+
+t              
+step s1exec2: <... completed>
+a              b              
+
+1              ABC            
+3              DEF            
+step s1c: COMMIT;
+step s2remp: <... completed>
diff --git a/src/test/modules/delay_execution/specs/partition-removal-1.spec b/src/test/modules/delay_execution/specs/partition-removal-1.spec
new file mode 100644
index 0000000000..e8e99f15bf
--- /dev/null
+++ b/src/test/modules/delay_execution/specs/partition-removal-1.spec
@@ -0,0 +1,45 @@
+# Test removal of a partition with less-than-exclusive locking.
+
+setup
+{
+  CREATE TABLE partrem (a int, b text) PARTITION BY LIST(a);
+  CREATE TABLE partrem1 PARTITION OF partrem FOR VALUES IN (1);
+  CREATE TABLE partrem2 PARTITION OF partrem FOR VALUES IN (2);
+  CREATE TABLE partrem3 PARTITION OF partrem FOR VALUES IN (3);
+  INSERT INTO partrem VALUES (1, 'ABC');
+  INSERT INTO partrem VALUES (2, 'JKL');
+  INSERT INTO partrem VALUES (3, 'DEF');
+}
+
+teardown
+{
+  DROP TABLE IF EXISTS partrem, partrem2;
+}
+
+# The SELECT will be planned with all three partitions shown above,
+# of which we expect partrem1 to be pruned at planning and partrem3 at
+# execution. Then we'll block, and by the time the query is actually
+# executed, detach of partrem2 is already underway; however we expect
+# its rows to still appear in the result.
+
+session "s1"
+setup		{ LOAD 'delay_execution';
+		  SET delay_execution.post_planning_lock_id = 12543; }
+step "s1b"	{ BEGIN; }
+step "s1brr"	{ BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step "s1exec"	{ SELECT * FROM partrem WHERE a <> 1 AND a <> (SELECT 3); }
+step "s1exec2"	{ SELECT * FROM partrem WHERE a <> (SELECT 2); }
+step "s1c"	{ COMMIT; }
+
+session "s2"
+step "s2remp"	{ ALTER TABLE partrem DETACH PARTITION partrem2 CONCURRENTLY; }
+
+session "s3"
+step "s3lock"	{ SELECT pg_advisory_lock(12543); }
+step "s3unlock"	{ SELECT pg_advisory_unlock(12543); }
+
+permutation "s3lock" "s1b" "s1exec" "s2remp" "s3unlock" "s1c"
+permutation "s3lock" "s1brr" "s1exec" "s2remp" "s3unlock" "s1c"
+
+permutation "s3lock" "s1b" "s1exec2" "s2remp" "s3unlock" "s1c"
+permutation "s3lock" "s1brr" "s1exec2" "s2remp" "s3unlock" "s1c"

Reply via email to