*** a/src/backend/catalog/heap.c
--- b/src/backend/catalog/heap.c
***************
*** 63,68 ****
--- 63,69 ----
  #include "parser/parse_relation.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
+ #include "storage/predicate.h"
  #include "storage/smgr.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
***************
*** 1658,1663 **** heap_drop_with_catalog(Oid relid)
--- 1659,1672 ----
  	CheckTableNotInUse(rel, "DROP TABLE");
  
  	/*
+ 	 * This effectively deletes all rows in the table, and may be done in a
+ 	 * serializable transaction.  In that case we must record a rw-conflict in
+ 	 * to this transaction from each transaction holding a predicate lock on
+ 	 * the table.
+ 	 */
+ 	CheckTableForSerializableConflictIn(rel);
+ 
+ 	/*
  	 * Delete pg_foreign_table tuple first.
  	 */
  	if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
*** a/src/backend/catalog/index.c
--- b/src/backend/catalog/index.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "parser/parser.h"
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
+ #include "storage/predicate.h"
  #include "storage/procarray.h"
  #include "storage/smgr.h"
  #include "utils/builtins.h"
***************
*** 1311,1316 **** index_drop(Oid indexId)
--- 1312,1323 ----
  	CheckTableNotInUse(userIndexRelation, "DROP INDEX");
  
  	/*
+ 	 * All predicate locks on the index are about to be made invalid.
+ 	 * Promote them to relation locks on the heap.
+ 	 */
+ 	TransferPredicateLocksToHeapRelation(userIndexRelation);
+ 
+ 	/*
  	 * Schedule physical removal of the files
  	 */
  	RelationDropStorage(userIndexRelation);
***************
*** 2787,2792 **** reindex_index(Oid indexId, bool skip_constraint_checks)
--- 2794,2805 ----
  	 */
  	CheckTableNotInUse(iRel, "REINDEX INDEX");
  
+ 	/*
+ 	 * All predicate locks on the index are about to be made invalid.
+ 	 * Promote them to relation locks on the heap.
+ 	 */
+ 	TransferPredicateLocksToHeapRelation(iRel);
+ 
  	PG_TRY();
  	{
  		/* Suppress use of the target index while rebuilding it */
*** a/src/backend/commands/cluster.c
--- b/src/backend/commands/cluster.c
***************
*** 39,44 ****
--- 39,45 ----
  #include "optimizer/planner.h"
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
+ #include "storage/predicate.h"
  #include "storage/procarray.h"
  #include "storage/smgr.h"
  #include "utils/acl.h"
***************
*** 385,390 **** cluster_rel(Oid tableOid, Oid indexOid, bool recheck, bool verbose,
--- 386,397 ----
  	if (OidIsValid(indexOid))
  		check_index_is_clusterable(OldHeap, indexOid, recheck, AccessExclusiveLock);
  
+ 	/*
+ 	 * All predicate locks on the table and its indexes are about to be made
+ 	 * invalid.  Promote them to relation locks on the heap.
+ 	 */
+ 	TransferPredicateLocksToHeapRelation(OldHeap);
+ 
  	/* rebuild_relation does all the dirty work */
  	rebuild_relation(OldHeap, indexOid, freeze_min_age, freeze_table_age,
  					 verbose);
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 70,75 ****
--- 70,76 ----
  #include "storage/bufmgr.h"
  #include "storage/lmgr.h"
  #include "storage/lock.h"
+ #include "storage/predicate.h"
  #include "storage/smgr.h"
  #include "utils/acl.h"
  #include "utils/builtins.h"
***************
*** 1040,1045 **** ExecuteTruncate(TruncateStmt *stmt)
--- 1041,1054 ----
  			Oid			toast_relid;
  
  			/*
+ 			 * This effectively deletes all rows in the table, and may be done
+ 			 * in a serializable transaction.  In that case we must record a
+ 			 * rw-conflict in to this transaction from each transaction
+ 			 * holding a predicate lock on the table.
+ 			 */
+ 			CheckTableForSerializableConflictIn(rel);
+ 
+ 			/*
  			 * Need the full transaction-safe pushups.
  			 *
  			 * Create a new empty storage file for the relation, and assign it
*** a/src/backend/storage/lmgr/predicate.c
--- b/src/backend/storage/lmgr/predicate.c
***************
*** 155,160 ****
--- 155,161 ----
   *							   BlockNumber newblkno);
   *		PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
   *								 BlockNumber newblkno);
+  *		TransferPredicateLocksToHeapRelation(const Relation relation)
   *		ReleasePredicateLocks(bool isCommit)
   *
   * conflict detection (may also trigger rollback)
***************
*** 162,167 ****
--- 163,169 ----
   *										HeapTupleData *tup, Buffer buffer)
   *		CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
   *									   Buffer buffer)
+  *		CheckTableForSerializableConflictIn(const Relation relation)
   *
   * final rollback checking
   *		PreCommit_CheckForSerializationFailure(void)
***************
*** 189,194 ****
--- 191,197 ----
  #include "storage/procarray.h"
  #include "utils/rel.h"
  #include "utils/snapmgr.h"
+ #include "utils/syscache.h"
  #include "utils/tqual.h"
  
  /* Uncomment the next line to test the graceful degradation code. */
***************
*** 434,439 **** static bool TransferPredicateLocksToNewTarget(const PREDICATELOCKTARGETTAG oldta
--- 437,444 ----
  								  const PREDICATELOCKTARGETTAG newtargettag,
  								  bool removeOld);
  static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
+ static Oid	IfIndexGetRelation(Oid indexId);
+ static void DropAllPredicateLocksFromTableImpl(const Relation relation, bool transfer);
  static void SetNewSxactGlobalXmin(void);
  static bool ReleasePredicateLocksIfROSafe(void);
  static void ClearOldPredicateLocks(void);
***************
*** 2543,2548 **** exit:
--- 2548,2856 ----
  	return !outOfShmem;
  }
  
+ /*
+  * IfIndexGetRelation: given a relation OID, get the OID of the heap
+  * relation it is an index on, or return InvalidOid if the argument is not
+  * an index.	Uses the system cache.
+  */
+ static Oid
+ IfIndexGetRelation(Oid indexId)
+ {
+ 	HeapTuple	tuple;
+ 	Form_pg_index index;
+ 	Oid			result;
+ 
+ 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId));
+ 	if (!HeapTupleIsValid(tuple))
+ 		return InvalidOid;
+ 
+ 	index = (Form_pg_index) GETSTRUCT(tuple);
+ 	Assert(index->indexrelid == indexId);
+ 
+ 	result = index->indrelid;
+ 	ReleaseSysCache(tuple);
+ 	return result;
+ }
+ 
+ /*
+  * Drop all predicate locks of any granularity from a heap relation and all of
+  * its indexes, with optional transfer to a relation-level lock on the heap.
+  *
+  * This requires grabbing a lot of LW locks and scanning the entire lock
+  * target table for matches.  That makes this more expensive than most
+  * predicate lock management functions, but it will only be called for DDL
+  * type commands and there are fast returns when no serializable transactions
+  * are active or the relation is temporary.
+  *
+  * We are not using the TransferPredicateLocksToNewTarget function because
+  * it acquires its own locks on the partitions of the two targets involved,
+  * and we'll already be holding all partition locks.
+  *
+  * We can't throw an error from here, because the call could be from a
+  * transaction which is not serializable.
+  *
+  * NOTE: This is currently only called with transfer set to true, but that may
+  * change.	If we decide to clean up the locks from a table on commit of a
+  * transaction which executed DROP TABLE, the false condition will be useful.
+  */
+ static void
+ DropAllPredicateLocksFromTableImpl(const Relation relation, bool transfer)
+ {
+ 	HASH_SEQ_STATUS seqstat;
+ 	PREDICATELOCKTARGET *oldtarget;
+ 	PREDICATELOCKTARGET *heaptarget;
+ 	PREDICATELOCKTARGETTAG heaptargettag;
+ 	PREDICATELOCKTAG newpredlocktag;
+ 	Oid			dbId;
+ 	Oid			indexId;
+ 	Oid			heapId;
+ 	int			i;
+ 	bool		isSingleIndex;
+ 	bool		found;
+ 	uint32		reservedtargettaghash;
+ 	uint32		heaptargettaghash;
+ 
+ 	/*
+ 	 * Bail out quickly if there are no serializable transactions running.
+ 	 * It's safe to check this without taking locks because the caller is
+ 	 * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
+ 	 * would matter here can be acquired while that is held.
+ 	 */
+ 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
+ 		return;
+ 
+ 	if (SkipSplitTracking(relation))
+ 		return;
+ 
+ 	dbId = relation->rd_node.dbNode;
+ 	if (relation->rd_index == NULL)
+ 	{
+ 		isSingleIndex = false;
+ 		indexId = InvalidOid;	/* quiet compiler warning */
+ 		heapId = relation->rd_id;
+ 	}
+ 	else
+ 	{
+ 		isSingleIndex = true;
+ 		indexId = relation->rd_id;
+ 		heapId = relation->rd_index->indrelid;
+ 	}
+ 	Assert(heapId != InvalidOid);
+ 	Assert(transfer || !isSingleIndex); /* index OID only makes sense with
+ 										 * transfer */
+ 
+ 	SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
+ 	heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
+ 	heaptarget = NULL;			/* Retrieve first time needed, then keep. */
+ 
+ 	LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+ 	for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ 		LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
+ 	LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * If there are any locks to be moved, that means there we will wind up
+ 	 * with a relation lock on the heap.  That covers all other locks on the
+ 	 * heap and all locks on all indexes for the table, so we can be very
+ 	 * aggressive about deleting any locks except for the heap relation lock.
+ 	 * But we don't want to add a heap relation lock unless there is at least
+ 	 * one lock that needs to be transferred.  If this function is called with
+ 	 * an index OID, we'll first scan to see if there are any predicate locks
+ 	 * on that index.  As soon as we find one we drop down into the update
+ 	 * loop.  If we find none, we release the LW locks and return without
+ 	 * changing anything.
+ 	 *
+ 	 * This optimization comes into play two ways -- a REINDEX might be done
+ 	 * on an index which has no predicate locks, or an operation might be done
+ 	 * which rewrites the entire table and calls REINDEX on each index. In the
+ 	 * latter case the action against the base table will move all the index
+ 	 * locks before any of the index rebuilds are requested.
+ 	 */
+ 	if (isSingleIndex)
+ 	{
+ 		bool		foundIndexLock = false;
+ 
+ 		hash_seq_init(&seqstat, PredicateLockTargetHash);
+ 		while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
+ 		{
+ 			if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != indexId)
+ 				continue;		/* wrong OID for the index */
+ 			if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
+ 				continue;		/* wrong database */
+ 			foundIndexLock = true;
+ 			hash_seq_term(&seqstat);
+ 			break;
+ 		}
+ 		if (!foundIndexLock)
+ 		{
+ 			/* Release locks in reverse order */
+ 			LWLockRelease(SerializableXactHashLock);
+ 			for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ 				LWLockRelease(FirstPredicateLockMgrLock + i);
+ 			LWLockRelease(SerializablePredicateLockListLock);
+ 			return;
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Remove the reserved entry to give us scratch space, so we know we'll be
+ 	 * able to create the new lock target.
+ 	 */
+ 	reservedtargettaghash = 0;	/* quiet compiler warning */
+ 	if (transfer)
+ 	{
+ 		reservedtargettaghash = PredicateLockTargetTagHashCode(&ReservedTargetTag);
+ 		hash_search_with_hash_value(PredicateLockTargetHash,
+ 									&ReservedTargetTag,
+ 									reservedtargettaghash,
+ 									HASH_REMOVE, &found);
+ 		Assert(found);
+ 	}
+ 
+ 	/* Scan through PredicateLockHash and copy contents */
+ 	hash_seq_init(&seqstat, PredicateLockTargetHash);
+ 
+ 	while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
+ 	{
+ 		PREDICATELOCK *oldpredlock;
+ 
+ 		/*
+ 		 * Check whether this is a target which needs attention.
+ 		 */
+ 		if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
+ 			continue;			/* wrong database */
+ 		if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) == heapId)
+ 		{
+ 			if (GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
+ 				continue;		/* already the right lock */
+ 		}
+ 		else
+ 		{
+ 			if (IfIndexGetRelation(GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag)) != heapId)
+ 				continue;		/* not index or index on wrong heap relation */
+ 		}
+ 
+ 		/*
+ 		 * If we made it here, we have work to do.	We make sure the heap
+ 		 * relation lock exists, then we walk the list of predicate locks for
+ 		 * the old target we found, moving all locks to the heap relation lock
+ 		 * -- unless they already hold that.
+ 		 */
+ 
+ 		/*
+ 		 * First make sure we have the heap relation target.  We only need to
+ 		 * do this once.
+ 		 */
+ 		if (transfer && heaptarget == NULL)
+ 		{
+ 			heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
+ 													 &heaptargettag,
+ 													 heaptargettaghash,
+ 													 HASH_ENTER, &found);
+ 			Assert(heaptarget != NULL);
+ 			if (!found)
+ 				SHMQueueInit(&heaptarget->predicateLocks);
+ 			newpredlocktag.myTarget = heaptarget;
+ 		}
+ 
+ 		/*
+ 		 * Loop through moving locks from this target to the relation target.
+ 		 */
+ 		oldpredlock = (PREDICATELOCK *)
+ 			SHMQueueNext(&(oldtarget->predicateLocks),
+ 						 &(oldtarget->predicateLocks),
+ 						 offsetof(PREDICATELOCK, targetLink));
+ 		while (oldpredlock)
+ 		{
+ 			PREDICATELOCK *nextpredlock;
+ 			PREDICATELOCK *newpredlock;
+ 			SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
+ 
+ 			nextpredlock = (PREDICATELOCK *)
+ 				SHMQueueNext(&(oldtarget->predicateLocks),
+ 							 &(oldpredlock->targetLink),
+ 							 offsetof(PREDICATELOCK, targetLink));
+ 			newpredlocktag.myXact = oldpredlock->tag.myXact;
+ 
+ 			/*
+ 			 * It's OK ot remove the old lock first because of the ACCESS
+ 			 * EXCLUSIVE lock on the heap relation when this is called.  It is
+ 			 * desirable to do so because it avoids any chance of running out
+ 			 * of lock structure entries for the table.
+ 			 */
+ 			SHMQueueDelete(&(oldpredlock->xactLink));
+ 			/* No need for retail delete from oldtarget list. */
+ 			hash_search(PredicateLockHash,
+ 						&oldpredlock->tag,
+ 						HASH_REMOVE, &found);
+ 			Assert(found);
+ 
+ 			if (transfer)
+ 			{
+ 				newpredlock = (PREDICATELOCK *)
+ 					hash_search_with_hash_value
+ 					(PredicateLockHash,
+ 					 &newpredlocktag,
+ 					 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
+ 														  heaptargettaghash),
+ 					 HASH_ENTER_NULL, &found);
+ 				Assert(newpredlock != NULL);
+ 				if (!found)
+ 				{
+ 					SHMQueueInsertBefore(&(heaptarget->predicateLocks),
+ 										 &(newpredlock->targetLink));
+ 					SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
+ 										 &(newpredlock->xactLink));
+ 					newpredlock->commitSeqNo = oldCommitSeqNo;
+ 				}
+ 				else
+ 				{
+ 					if (newpredlock->commitSeqNo < oldCommitSeqNo)
+ 						newpredlock->commitSeqNo = oldCommitSeqNo;
+ 				}
+ 
+ 				Assert(newpredlock->commitSeqNo != 0);
+ 				Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
+ 					   || (newpredlock->tag.myXact == OldCommittedSxact));
+ 			}
+ 
+ 			oldpredlock = nextpredlock;
+ 		}
+ 
+ 		hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE, &found);
+ 		Assert(found);
+ 	}
+ 
+ 	if (transfer)
+ 	{
+ 		/* Put the reserved entry back */
+ 		hash_search_with_hash_value(PredicateLockTargetHash,
+ 									&ReservedTargetTag,
+ 									reservedtargettaghash,
+ 									HASH_ENTER, &found);
+ 		Assert(!found);
+ 	}
+ 
+ 	/* Release locks in reverse order */
+ 	LWLockRelease(SerializableXactHashLock);
+ 	for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ 		LWLockRelease(FirstPredicateLockMgrLock + i);
+ 	LWLockRelease(SerializablePredicateLockListLock);
+ }
+ 
+ /*
+  * TransferPredicateLocksToHeapRelation
+  *		For all transactions, transfer all predicate locks for the given
+  *		relation to a single relation lock on the heap.  For a heap relation
+  *		that includes all locks on indexes; for an index the same locks moves
+  *		are needed, but only if one or more locks exists on that index.
+  */
+ void
+ TransferPredicateLocksToHeapRelation(const Relation relation)
+ {
+ 	DropAllPredicateLocksFromTableImpl(relation, true);
+ }
+ 
  
  /*
   *		PredicateLockPageSplit
***************
*** 3792,3797 **** CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
--- 4100,4201 ----
  }
  
  /*
+  * CheckTableForSerializableConflictIn
+  *		The entire table is going through a DDL-style logical mass write (like
+  *		TRUNCATE TABLE or DROP TABLE.  While these operations do not operate
+  *		entirely within the bounds of snapshot isolation, they can occur
+  *		inside of a serialziable transaction, and will logically occur after
+  *		any reads which saw rows which were destroyed by these operations, so
+  *		we do what we can to serialize properly under SSI.
+  *
+  * The relation passed in must be a heap relation for a table. Any predicate
+  * lock of any granularity on the table or any of its indexes will cause a
+  * rw-conflict in to this transaction.
+  *
+  * This should be done before altering the predicate locks because the
+  * transaction could be rolled back because of a conflict, in which case the
+  * lock changes are not needed.
+  */
+ void
+ CheckTableForSerializableConflictIn(const Relation relation)
+ {
+ 	HASH_SEQ_STATUS seqstat;
+ 	PREDICATELOCKTARGET *target;
+ 	Oid			dbId;
+ 	Oid			heapId;
+ 	int			i;
+ 
+ 	/*
+ 	 * Bail out quickly if there are no serializable transactions running.
+ 	 * It's safe to check this without taking locks because the caller is
+ 	 * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
+ 	 * would matter here can be acquired while that is held.
+ 	 */
+ 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
+ 		return;
+ 
+ 	if (SkipSerialization(relation))
+ 		return;
+ 
+ 	Assert(relation->rd_index == NULL);
+ 
+ 	dbId = relation->rd_node.dbNode;
+ 	heapId = relation->rd_id;
+ 
+ 	LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
+ 	for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
+ 		LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
+ 	LWLockAcquire(SerializableXactHashLock, LW_SHARED);
+ 
+ 	/* Scan through PredicateLockHash and copy contents */
+ 	hash_seq_init(&seqstat, PredicateLockTargetHash);
+ 
+ 	while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
+ 	{
+ 		PREDICATELOCK *predlock;
+ 
+ 		/*
+ 		 * Check whether this is a target which needs attention.
+ 		 */
+ 		if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
+ 			continue;			/* wrong database */
+ 		if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId
+ 			&& IfIndexGetRelation(GET_PREDICATELOCKTARGETTAG_RELATION(target->tag)) != heapId)
+ 			continue;			/* not index or index on wrong heap relation */
+ 
+ 		/*
+ 		 * Loop through locks for this target and flag conflicts.
+ 		 */
+ 		predlock = (PREDICATELOCK *)
+ 			SHMQueueNext(&(target->predicateLocks),
+ 						 &(target->predicateLocks),
+ 						 offsetof(PREDICATELOCK, targetLink));
+ 		while (predlock)
+ 		{
+ 			PREDICATELOCK *nextpredlock;
+ 
+ 			nextpredlock = (PREDICATELOCK *)
+ 				SHMQueueNext(&(target->predicateLocks),
+ 							 &(predlock->targetLink),
+ 							 offsetof(PREDICATELOCK, targetLink));
+ 
+ 			if (predlock->tag.myXact != MySerializableXact
+ 				&& !RWConflictExists(predlock->tag.myXact, (SERIALIZABLEXACT *) MySerializableXact))
+ 				FlagRWConflict(predlock->tag.myXact, (SERIALIZABLEXACT *) MySerializableXact);
+ 
+ 			predlock = nextpredlock;
+ 		}
+ 	}
+ 
+ 	/* Release locks in reverse order */
+ 	LWLockRelease(SerializableXactHashLock);
+ 	for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
+ 		LWLockRelease(FirstPredicateLockMgrLock + i);
+ 	LWLockRelease(SerializablePredicateLockListLock);
+ }
+ 
+ 
+ /*
   * Flag a rw-dependency between two serializable transactions.
   *
   * The caller is responsible for ensuring that we have a LW lock on
***************
*** 3814,3820 **** FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
  		SetRWConflict(reader, writer);
  }
  
! /*
   * We are about to add a RW-edge to the dependency graph - check that we don't
   * introduce a dangerous structure by doing so, and abort one of the
   * transactions if so.
--- 4218,4224 ----
  		SetRWConflict(reader, writer);
  }
  
! /*----------------------------------------------------------------------------
   * We are about to add a RW-edge to the dependency graph - check that we don't
   * introduce a dangerous structure by doing so, and abort one of the
   * transactions if so.
***************
*** 3823,3835 **** FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
   * in the dependency graph:
   *
   *		Tin ------> Tpivot ------> Tout
!  *	   		  rw			 rw
   *
   * Furthermore, Tout must commit first.
   *
   * One more optimization is that if Tin is declared READ ONLY (or commits
   * without writing), we can only have a problem if Tout committed before Tin
   * acquired its snapshot.
   */
  static void
  OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
--- 4227,4240 ----
   * in the dependency graph:
   *
   *		Tin ------> Tpivot ------> Tout
!  *			  rw			 rw
   *
   * Furthermore, Tout must commit first.
   *
   * One more optimization is that if Tin is declared READ ONLY (or commits
   * without writing), we can only have a problem if Tout committed before Tin
   * acquired its snapshot.
+  *----------------------------------------------------------------------------
   */
  static void
  OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
***************
*** 3842,3873 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  
  	failure = false;
  
! 	/*
  	 * Check for already-committed writer with rw-conflict out flagged
  	 * (conflict-flag on W means that T2 committed before W):
  	 *
  	 *		R ------> W ------> T2
! 	 *			rw        rw
  	 *
  	 * That is a dangerous structure, so we must abort. (Since the writer
  	 * has already committed, we must be the reader)
  	 */
  	if (SxactIsCommitted(writer)
  	  && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
  		failure = true;
  
! 	/*
  	 * Check whether the writer has become a pivot with an out-conflict
  	 * committed transaction (T2), and T2 committed first:
  	 *
  	 *		R ------> W ------> T2
! 	 *			rw        rw
  	 *
  	 * Because T2 must've committed first, there is no anomaly if:
  	 * - the reader committed before T2
  	 * - the writer committed before T2
  	 * - the reader is a READ ONLY transaction and the reader was concurrent
  	 *	 with T2 (= reader acquired its snapshot before T2 committed)
  	 */
  	if (!failure)
  	{
--- 4247,4280 ----
  
  	failure = false;
  
! 	/*------------------------------------------------------------------------
  	 * Check for already-committed writer with rw-conflict out flagged
  	 * (conflict-flag on W means that T2 committed before W):
  	 *
  	 *		R ------> W ------> T2
! 	 *			rw		  rw
  	 *
  	 * That is a dangerous structure, so we must abort. (Since the writer
  	 * has already committed, we must be the reader)
+ 	 *------------------------------------------------------------------------
  	 */
  	if (SxactIsCommitted(writer)
  	  && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
  		failure = true;
  
! 	/*------------------------------------------------------------------------
  	 * Check whether the writer has become a pivot with an out-conflict
  	 * committed transaction (T2), and T2 committed first:
  	 *
  	 *		R ------> W ------> T2
! 	 *			rw		  rw
  	 *
  	 * Because T2 must've committed first, there is no anomaly if:
  	 * - the reader committed before T2
  	 * - the writer committed before T2
  	 * - the reader is a READ ONLY transaction and the reader was concurrent
  	 *	 with T2 (= reader acquired its snapshot before T2 committed)
+ 	 *------------------------------------------------------------------------
  	 */
  	if (!failure)
  	{
***************
*** 3891,3897 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  				&& (!SxactIsCommitted(writer)
  					|| t2->commitSeqNo <= writer->commitSeqNo)
  				&& (!SxactIsReadOnly(reader)
! 					|| t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
  			{
  				failure = true;
  				break;
--- 4298,4304 ----
  				&& (!SxactIsCommitted(writer)
  					|| t2->commitSeqNo <= writer->commitSeqNo)
  				&& (!SxactIsReadOnly(reader)
! 			   || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
  			{
  				failure = true;
  				break;
***************
*** 3903,3918 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  		}
  	}
  
! 	/*
  	 * Check whether the reader has become a pivot with a committed writer:
  	 *
  	 *		T0 ------> R ------> W
! 	 *			 rw        rw
  	 *
  	 * Because W must've committed first for an anomaly to occur, there is no
  	 * anomaly if:
  	 * - T0 committed before the writer
  	 * - T0 is READ ONLY, and overlaps the writer
  	 */
  	if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
  	{
--- 4310,4326 ----
  		}
  	}
  
! 	/*------------------------------------------------------------------------
  	 * Check whether the reader has become a pivot with a committed writer:
  	 *
  	 *		T0 ------> R ------> W
! 	 *			 rw		   rw
  	 *
  	 * Because W must've committed first for an anomaly to occur, there is no
  	 * anomaly if:
  	 * - T0 committed before the writer
  	 * - T0 is READ ONLY, and overlaps the writer
+ 	 *------------------------------------------------------------------------
  	 */
  	if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
  	{
***************
*** 3934,3940 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  				&& (!SxactIsCommitted(t0)
  					|| t0->commitSeqNo >= writer->commitSeqNo)
  				&& (!SxactIsReadOnly(t0)
! 					|| t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
  			{
  				failure = true;
  				break;
--- 4342,4348 ----
  				&& (!SxactIsCommitted(t0)
  					|| t0->commitSeqNo >= writer->commitSeqNo)
  				&& (!SxactIsReadOnly(t0)
! 			   || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
  			{
  				failure = true;
  				break;
***************
*** 3950,3957 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  	{
  		/*
  		 * We have to kill a transaction to avoid a possible anomaly from
! 		 * occurring. If the writer is us, we can just ereport() to cause
! 		 * a transaction abort. Otherwise we flag the writer for termination,
  		 * causing it to abort when it tries to commit. However, if the writer
  		 * is a prepared transaction, already prepared, we can't abort it
  		 * anymore, so we have to kill the reader instead.
--- 4358,4365 ----
  	{
  		/*
  		 * We have to kill a transaction to avoid a possible anomaly from
! 		 * occurring. If the writer is us, we can just ereport() to cause a
! 		 * transaction abort. Otherwise we flag the writer for termination,
  		 * causing it to abort when it tries to commit. However, if the writer
  		 * is a prepared transaction, already prepared, we can't abort it
  		 * anymore, so we have to kill the reader instead.
***************
*** 3962,3968 **** OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
  			ereport(ERROR,
  					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
  					 errmsg("could not serialize access due to read/write dependencies among transactions"),
! 			errdetail("Cancelled on identification as a pivot, during write."),
  					 errhint("The transaction might succeed if retried.")));
  		}
  		else if (SxactIsPrepared(writer))
--- 4370,4376 ----
  			ereport(ERROR,
  					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
  					 errmsg("could not serialize access due to read/write dependencies among transactions"),
! 					 errdetail("Cancelled on identification as a pivot, during write."),
  					 errhint("The transaction might succeed if retried.")));
  		}
  		else if (SxactIsPrepared(writer))
*** a/src/include/storage/predicate.h
--- b/src/include/storage/predicate.h
***************
*** 49,59 **** extern void PredicateLockPage(const Relation relation, const BlockNumber blkno);
--- 49,61 ----
  extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple);
  extern void PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
  extern void PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
+ extern void TransferPredicateLocksToHeapRelation(const Relation relation);
  extern void ReleasePredicateLocks(const bool isCommit);
  
  /* conflict detection (may also trigger rollback) */
  extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple, const Buffer buffer);
  extern void CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, const Buffer buffer);
+ extern void CheckTableForSerializableConflictIn(const Relation relation);
  
  /* final rollback checking */
  extern void PreCommit_CheckForSerializationFailure(void);
