Hi,

I've been reviewing the last patch version, focusing mostly on the
decoding group part. Let me respond to several points first, then new
review bits.

On 03/28/2018 05:28 PM, Nikhil Sontakke wrote:
> Hi Tomas,
> 
>> Now, about the interlock implementation - I see you've reused the "lock
>> group" concept from parallel query. That may make sense, unfortunately
>> there's about no documentation explaining how it works, what is the
>> "protocol" etc. There is fairly extensive documentation for "lock
>> groups" in src/backend/storage/lmgr/README, but while the "decoding
>> group" code is inspired by it, the code is actually very different.
>> Compare for example BecomeLockGroupLeader and BecomeDecodeGroupLeader,
>> and you'll see what I mean.
>>
>> So I think the first thing we need to do is add proper documentation
>> (possibly into the same README), explaining how the decode groups work,
>> how the decodeAbortPending works, etc.
>>
> 
> I have added details about this in src/backend/storage/lmgr/README as
> suggested by you.
> 

Thanks. I think the README is a good start, but I think we also need to
improve the comments, which is usually more detailed than the README.
For example, it's not quite acceptable that LogicalLockTransaction and
LogicalUnlockTransaction have about no comments, especially when it's
meant to be public API for decoding plugins.

>>
>> BTW, do we need to do any of this with (wal_level < logical)? I don't
>> see any quick bail-out in any of the functions in this case, but it
>> seems like a fairly obvious optimization.
>>
> 
> The calls to the LogicalLockTransaction/LogicalUnLockTransaction APIs
> will be from inside plugins or the reorderbuffer code paths. Those
> will get invoked only in the wal_level logical case, hence I did not
> add further checks.
> 

Oh, right.

>> Similarly, can't the logical workers indicate that they need to decode
>> 2PC transactions (or in-progress transactions in general) in some way?
>> If we knew there are no such workers, that would also allow ignoring the
>> interlock, no?
>>
> 
> These APIs check if the transaction is already committed and cache
> that information for further calls, so for regular transactions this
> becomes a no-op
> 

I see. So when the output plugin never calls LogicalLockTransaction on
an in-progress transaction (e.g. 2PC after PREPARE), it never actually
initializes the decoding group. Works for me.

>>
>> 2) regression tests
>> -------------------
>>
>> I really dislike the use of \set to run the same query repeatedly. It
>> makes analysis of regression failures even more tedious than it already
>> is. I'd just copy the query to all the places.
>>
> 
> They are long-winded queries and IMO made the test file look too
> cluttered and verbose..
> 

Well, I don't think that's a major problem, and it certainly makes it
more difficult to investigate regression failures.

>>
>> 3) worker.c
>> -----------
>>
>> The comment in apply_handle_rollback_prepared_txn says this:
>>
>>   /*
>>    * During logical decoding, on the apply side, it's possible that a
>>    * prepared transaction got aborted while decoding. In that case, we
>>    * stop the decoding and abort the transaction immediately. However
>>    * the ROLLBACK prepared processing still reaches the subscriber. In
>>    * that case it's ok to have a missing gid
>>    */
>>   if (LookupGXact(commit_data->gid)) { ... }
>>
>> But is it safe to assume it never happens due to an error? In other
>> words, is there a way to decide that the GID really aborted? Or, why
>> should the provider sent the rollback at all - surely it could know if
>> the transaction/GID was sent to subscriber or not, right?
>>
> 
> Since we decode in commit WAL order, when we reach the ROLLBACK
> PREPARED wal record, we cannot be sure that we did infact abort the
> decoding mid ways because of this concurrent rollback. It's possible
> that this rollback comes much much later as well when all decoding
> backends have successfully prepared it on the subscribers already.
> 

Ah, OK. So when the transaction gets aborted (by ROLLBACK PREPARED)
concurrently with the decoding, we abort the apply transaction and
discard the ReorderBufferTXN.

Which means that later, when we decode the abort, we don't know whether
the decoding reached abort or prepare, and so we have to send the
ROLLBACK PREPARED to the subscriber too.

For a moment I was thinking we might simply remember TXN outcome in
reorder buffer, but obviously that does not work - the decoding might
restart in between, and as you say the distance (in terms of WAL) may be
quite significant.

>>
>> 7) proto.c / worker.c
>> ---------------------
>>
>> Until now, the 'action' (essentially the first byte of each message)
>> clearly identified what the message does. So 'C' -> commit, 'I' ->
>> insert, 'D' -> delete etc. This also means the "handle" methods were
>> inherently simple, because each handled exactly one particular action
>> and nothing else.
>>
>> You've expanded the protocol in a way that suddenly 'C' means either
>> COMMIT or ROLLBACK, and 'P' means PREPARE, ROLLBACK PREPARED or COMMIT
>> PREPARED. I don't think that's how the protocol should be extended - if
>> anything, it's damn confusing and unlike the existing code. You should
>> define new action, and keep the handlers in worker.c simple.
>>
> 
> I thought this grouped regular commit and 2PC transactions properly.
> Can look at this again if this style is not favored.
> 

Hmmm, it's not how I'd do it, but perhaps someone who originally
designed the protocol should review this bit.


Now, the new bits ... attached is a .diff with a couple of changes and
comments on various places.

1) LogicalLockTransaction

- This function is part of a public API, yet it has no comment. That
needs fixing - it has to be clear how to use it. The .diff suggests a
comment, but it may need improvements.


- As I mentioned in the previous review, BecomeDecodeGroupLeader is a
misleading name. It suggest the called becomes a leader, while in fact
it looks up the PROC running the XID and makes it a leader. This is
obviously due to copying the code from lock groups, where the caller
actually becomes the leader. It's incorrect here. I suggest something
like LookupDecodeGroupLeader() or something.


- In the "if (MyProc->decodeGroupLeader == NULL)" block there are two
blocks rechecking the transaction status:

  if (proc == NULL)
  { ... recheck ... }

  if (!BecomeDecodeGroupMember(proc, proc->pid, rbtxn_prepared(txn)))
  { ... recheck ...}

I suggest to join them into a single block.


- This Assert() is either bogus and there can indeed be cases with
(MyProc->decodeGroupLeader==NULL), or the "if" is unnecessary:

    Assert(MyProc->decodeGroupLeader);

    if (MyProc->decodeGroupLeader) { ... }

- I'm wondering why we're maintaining decodeAbortPending flags both for
the leader and all the members. ISTM it'd be perfectly fine to only
check the leader, particularly because RemoveDecodeGroupMemberLocked
removes the members from the decoding group. So that seems unnecessary,
and we can remove the

    if (MyProc->decodeAbortPending)
    { ... }

- LogicalUnlockTransaction needs a comment(s) too.


2) BecomeDecodeGroupLeader

- Wrong name (already mentioned above).

- It can bail out when (!proc), which will simplify the code a bit.

- Why does it check PID of the process at all? Seems unnecessary,
considering we're already checking the XID.

- Can a proc executing a XID have a different leader? I don't think so,
so I'd make that an Assert().

    Assert(!proc || (proc->decodeGroupLeader == proc));

And it'll allow simplification of some of the conditions.

- We're only dealing with prepared transactions now, so I'd just drop
the is_prepared flag - it'll make the code a bit simpler, we can add it
later in patch adding decoding of regular in-progress transactions. We
can't test the (!is_prepared) anyway.

- Why are we making the leader also a member of the group? Seems rather
unnecessary, and it complicates the abort handling, because we need to
skip the leader when deciding to wait.


3) LogicalDecodeRemoveTransaction

- It's not clear to me what happens when a decoding backend gets killed
between LogicalLockTransaction/LogicalUnlockTransaction. Doesn't that
mean LogicalDecodeRemoveTransaction will get stuck, because the proc is
still in the decoding group?

- The loop now tweaks decodeAbortPending of the members, but I don't
think that's necessary either - the LogicalUnlockTransaction can check
the leader flag just as easily.


4) a bunch of comment / docs improvements, ...

I'm suggesting rewording a couple of comments. I've also added a couple
of missing comments - e.g. to LogicalLockTransaction and the lock group
methods in general.

Also, a couple more questions and suggestions in XXX comments.

regards

-- 
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e2db0eb..8fbd8b8 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -551,8 +551,7 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 
 /*
  * LookupGXact
- *		Check if the prepared transaction with the given GID is
- *		around
+ *		Check if the prepared transaction with the given GID is around
  */
 bool
 LookupGXact(const char *gid)
@@ -1538,9 +1537,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 									   gid);
 
 	ProcArrayRemove(proc, latestXid);
+
 	/*
-	 * Tell logical decoding backends interested in this XID
-	 * that this is going away
+	 * Coordinate with logical decoding backends that may be already
+	 * decoding this prepared transaction. When aborting a transaction,
+	 * we need to wait for all of them to leave the decoding group. If
+	 * committing, we simply remove all members from the group.
 	 */
 	LogicalDecodeRemoveTransaction(proc, isCommit);
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8025d99..c378157 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1220,45 +1220,90 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	}
 }
 
+/*
+ * LogicalLockTransaction
+ *		Make sure the transaction is not aborted during decoding.
+ *
+ * The logical decoding plugins may need to access catalogs (both system
+ * and user-defined), e.g. to get metadata about tuples, do custom
+ * filtering etc. While decoding committed transactions that is not an
+ * issue, but in-progress transactions may abort while being decoded, in
+ * which case the catalog access may fail in various ways (rows from
+ * aborted transactions are eligible for more aggressive cleanup, may
+ * not be accessible through indexes due to breaking HOT chains etc.).
+ *
+ * To prevent these issues, we need to prevent abort of the transaction
+ * while accessing any catalogs. To enforce that, each decoding backend
+ * has to call LogicalLockTransaction prior to any catalog access, and
+ * then LogicalUnlockTransaction immediately after it. These functions
+ * add/remove the decoding backend from a "decoding group" for a given
+ * transaction. While aborting a prepared transaction, the backend will
+ * wait for all current members of the decoding group to leave (see
+ * LogicalDecodeRemoveTransaction).
+ *
+ * The function return true when it's safe to access catalogs, and
+ * false when the transaction aborted (or is being aborted) in which
+ * case the plugin should stop decoding it.
+ *
+ * The decoding backend joins the decoding group only when actually
+ * needed. For example when the transaction did no catalog changes,
+ * or when it's known to already have committed (or aborted), we can
+ * bail out without joining the group.
+ */
 bool
 LogicalLockTransaction(ReorderBufferTXN *txn)
 {
 	bool	ok = false;
 
 	/*
-	 * Prepared transactions and uncommitted transactions
-	 * that have modified catalogs need to interlock with
-	 * concurrent rollback to ensure that there are no
-	 * issues while decoding
+	 * Transactions that have not modified catalogs do not need to
+	 * join the decoding group.
 	 */
-
 	if (!rbtxn_has_catalog_changes(txn))
 		return true;
 
 	/*
-	 * Is it a prepared txn? Similar checks for uncommitted
-	 * transactions when we start supporting them
+	 * Currently, only 2PC transactions can be decoded before commit
+	 * (at prepare). So regular transactions are automatically safe.
+	 *
+	 * XXX This may be unnecessary, because for regular transactions
+	 * will be detected as committed.
 	 */
 	if (!rbtxn_prepared(txn))
 		return true;
 
-	/* check cached status */
+	/*
+	 * Check commit status. If a transaction already committed, there
+	 * is no danger when accessing catalogs. If it aborted, we can
+	 * stop decoding it right away.
+	 */
 	if (rbtxn_commit(txn))
 		return true;
+
 	if (rbtxn_rollback(txn))
 		return false;
 
 	/*
-	 * Find the PROC that is handling this XID and add ourself as a
-	 * decodeGroupMember
+	 * Find the PROC handling this XID and join the decoding group.
+	 *
+	 * If this is the first call for this XID, we don't know which
+	 * PROC is executing the transaction (and acting as a leader).
+	 * In that case we need to lookup the leader.
 	 */
 	if (MyProc->decodeGroupLeader == NULL)
 	{
+		/*
+		 * FIXME The name is wrong - we're not becoming group leader,
+		 * we're looking up the PROC that executes the transaction and
+		 * making it the leader.
+		 */
 		PGPROC *proc = BecomeDecodeGroupLeader(txn->xid, rbtxn_prepared(txn));
 
 		/*
-		 * If decodeGroupLeader is NULL, then the only possibility
-		 * is that the transaction completed and went away
+		 * We have checked if the transaction committed/aborted, but it
+		 * is possible the PROC went away since then, in which case we
+		 * get decodeGroupLeader=NULL. We recheck transaction status,
+		 * expecting it to be either committed or aborted.
 		 */
 		if (proc == NULL)
 		{
@@ -1275,7 +1320,18 @@ LogicalLockTransaction(ReorderBufferTXN *txn)
 			}
 		}
 
-		/* Add ourself as a decodeGroupMember */
+		/*
+		 * Join the decoding group for the leader process.
+		 *
+		 * We're not holding any locks on PGPROC, so it's possible the
+		 * leader disappears, or starts executing another transaction.
+		 * In that case we're done.
+		 *
+		 * XXX Why not to merge those two blocks? Something like
+		 *
+		 * if ((proc == NULL) || (!BecomeDecodeGroupMember(proc, proc->pid, rbtxn_prepared(txn))))
+		 * { ... recheck xact status ...}
+		 */
 		if (!BecomeDecodeGroupMember(proc, proc->pid, rbtxn_prepared(txn)))
 		{
 			Assert(!TransactionIdIsInProgress(txn->xid));
@@ -1294,7 +1350,10 @@ LogicalLockTransaction(ReorderBufferTXN *txn)
 
 	/*
 	 * If we were able to add ourself, then Abort processing will
-	 * interlock with us. Check if the transaction is still around
+	 * interlock with us. Check if the transaction is still around.
+	 *
+	 * XXX Eh? If the Assert() enforces (decodeGroupLeader!=NULL),
+	 * then why is there an if condition?
 	 */
 	Assert(MyProc->decodeGroupLeader);
 
@@ -1304,6 +1363,12 @@ LogicalLockTransaction(ReorderBufferTXN *txn)
 
 		leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
 		LWLockAcquire(leader_lwlock, LW_SHARED);
+
+		/*
+		 * XXX Why to check this again? BecomeDecodeGroupMember already
+		 * checks the flag on the leader, and returns false if it's set
+		 * to true.
+		 */
 		if (MyProc->decodeAbortPending)
 		{
 			/*
@@ -1339,12 +1404,21 @@ LogicalUnlockTransaction(ReorderBufferTXN *txn)
 	LWLock	   *leader_lwlock;
 
 	/*
+	 * If the transaction is known to have aborted, we should have never got
+	 * here (the plugin should have interruped the decoding).
+	 */
+	Assert(!rbtxn_rollback(txn));
+
+	/* If it's not locked, we're done. */
+	if (!MyProc->decodeLocked)
+		return;
+
+	/*
 	 * Prepared transactions and uncommitted transactions
 	 * that have modified catalogs need to interlock with
 	 * concurrent rollback to ensure that there are no
 	 * issues while decoding
 	 */
-
 	if (!rbtxn_has_catalog_changes(txn))
 		return;
 
@@ -1358,8 +1432,6 @@ LogicalUnlockTransaction(ReorderBufferTXN *txn)
 	/* check cached status */
 	if (rbtxn_commit(txn))
 		return;
-	if (rbtxn_rollback(txn))
-		return;
 
 	Assert(MyProc->decodeGroupLeader);
 	leader_lwlock = LockHashPartitionLockByProc(MyProc->decodeGroupLeader);
@@ -1372,8 +1444,12 @@ LogicalUnlockTransaction(ReorderBufferTXN *txn)
 		LWLockRelease(leader_lwlock);
 		LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
 		RemoveDecodeGroupMemberLocked(MyProc->decodeGroupLeader);
-		/* reset the bool to let the leader know that we are going away */
+
+		/* reset the bool to let the leader know that we are going away
+		 * XXX Why? Just removing ourselves from the group should be enough.
+		 */
 		MyProc->decodeAbortPending = false;
+
 		txn->txn_flags |= RBTXN_ROLLBACK;
 	}
 	MyProc->decodeLocked = false;
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 9742a34..8c25eda 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -682,30 +682,37 @@ PIDs are not recycled quickly enough for this interlock to fail.
 Decode Group Locking
 --------------------
 
-We use an infrastructure which is very similar to the above group locking
-of parallel processes to create a group of backends that are performing
-logical decoding of an uncommitted or a prepared transaction.
-
-Decode Group locking adds five new members to each PGPROC:
+When decoding in-progress transactions, we need to prevent aborts while
+the decoding processes are accessing catalogs, which might lead to issues
+if the transaction modified some of the catalogs.  Currently this applies
+only to two-phase transactions, that may be decoded at PREPARE time, but
+in the future this may be extended to regular transactions too.
+
+To prevent that, the backend executing the abort is made to wait for all
+the decoding backends.  We use an infrastructure which is very similar
+to the above group locking to form groups of backends performing logical
+decoding of the same in-progress transaction.
+
+Decode Group locking adds five new members to each PGPROC: 
 decodeGroupLeader, decodeGroupMembers, decodeGroupLink, decodeLocked and
 decodeAbortPending.  A PGPROC's decodeGroupLeader is NULL for processes
 not involved in logical decoding.  When a process wants to decode an
-uncommitted or prepared transaction then it finds out the PGPROC
-structure which is associated with that transaction id and makes that
-PGPROC structure as its decodeGroupLeader.  The decodeGroupMembers field
-is only used in the leader; it is a list of the member PGPROCs of the
-decode group (the leader and all backends decoding this transaction id).
+in-progress transaction then it finds out the PGPROC structure which is
+associated with that transaction ID and makes that PGPROC structure as
+its decodeGroupLeader.  The decodeGroupMembers field is only used in the
+leader; it is a list of the member PGPROCs of the decode group (the
+leader and all backends decoding this transaction ID). 
 The decodeGroupLink field is the list link for this list. The decoding
 backend marks itself as decodeLocked while it is accessing catalog
-metadata for its decoding requirements via the
-LogicalLockTransaction API.  It resets the same via the
-LogicalUnlockTransaction API. Meanwhile, if the transaction id of this
-uncommitted or prepared transaction decides to abort then the PGPROC
-structure corresponding to it sets decodeAbortPending on itself and also
-on all the decodeGroupMembers entries.  The decodeGroupMembers entries
-stop decoding of this aborted transaction and exit. When all the
-decoding backends have exited then the aborting transaction goes ahead
-with its regular processing.
+metadata for its decoding requirements via the LogicalLockTransaction
+API.  It resets the same via the LogicalUnlockTransaction API.
+
+Meanwhile, if the transaction ID of this in-progress transaction decides
+to abort, then the PGPROC corresponding to it sets decodeAbortPending
+on itself and also on all the decodeGroupMembers entries.
+
+The decodeGroupMembers entries stop decoding this transaction and exit.
+When all the decoding backends have exited the abort can proceed.
 
 All five of these fields are considered to be protected by a lock manager
 partition lock.  The partition lock that protects these fields within a given
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 26d35c7..f72ea37 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -1906,10 +1906,18 @@ BecomeLockGroupMember(PGPROC *leader, int pid)
 }
 
 /*
- * BecomeDecodeGroupLeader - designate process as decode group leader
+ * BecomeDecodeGroupLeader
+ *		Designate process as decode group leader.
  *
- * Once this function has returned, other processes can join the decode group
- * by calling BecomeDecodeGroupMember.
+ * Once this function has returned, other processes can join the decode
+ * group by calling BecomeDecodeGroupMember.
+ *
+ * XXX Should be LookupDecodeGroupLeader() or something like that. For
+ * the lock groups the "become" makes sense, because the caller is the
+ * first process that also acts as a leader. Not like here.
+ *
+ * XXX We're only handling prepared transactions now, so let's get rid
+ * of the is_prepared flag. We can't check is_prepared=false anyway.
  */
 PGPROC *
 BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared)
@@ -1920,32 +1928,95 @@ BecomeDecodeGroupLeader(TransactionId xid, bool is_prepared)
 
 	Assert(xid != InvalidTransactionId);
 
-
+	/*
+	 * Lookup the backend executing this transaction.
+	 *
+	 * XXX If the transaction already completed, we can bail out. That
+	 * is, we can do
+	 * 
+	 * if (!proc)
+	 *		return NULL;
+	 */
 	proc = BackendXidGetProc(xid);
-	if (proc)
+	if (!proc)
 		pid = proc->pid;
 
 	/*
-	 * This proc will become decodeGroupLeader if it's
-	 * not already
+	 * Process running a XID can't have a leader, it can only be
+	 * a leader (in which case it points to itself).
+	 */
+    Assert(!proc || (proc->decodeGroupLeader == proc));
+
+	/*
+	 * This proc will become decodeGroupLeader if it's not already.
+	 *
+	 * XXX How could the proc have (decodeGroupLeader != NULL) and
+	 * (decodeGroupLeader != proc)? That is, why not to make the
+	 * condition (proc && proc->decodeGroupLeader != NULL). Perhaps
+	 * if we don't clean it up correctly at transaction end?
+	 *
+	 * XXX Why not to make this into
+	 *
+	 * if (proc->decodeGroupLeader != NULL)
+	 * 		return;
+	 *
+	 * or
+	 *
+	 * if (proc->decodeGroupLeader == NULL)
+	 * {
+	 *     ...
+	 * }
 	 */
 	if (proc && proc->decodeGroupLeader != proc)
 	{
 		volatile PGXACT *pgxact;
+
 		/* Create single-member group, containing this proc. */
 		leader_lwlock = LockHashPartitionLockByProc(proc);
 		LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
+
 		/* recheck we are still the same */
 		pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
+
+		/*
+		 * We know the process was executing the XID a while ago, but we
+		 * have not been holding any locks on PGPROC so it might have
+		 * started executing something else since then. So we need to
+		 * recheck that it is indeed still running the right XID.
+		 *
+		 * If it's not, the transaction must have already completed, so
+		 * we don't need to create any decoding group.
+		 *
+ 		 * XXX Why the check on PID? BecomeLockGroupMember does that,
+ 		 * but that does not mean we need to (it's not a parameter).
+ 		 */
 		if (proc->pid == pid && pgxact->xid == xid)
 		{
+			/*
+			 * XXX Seems unnecessary, and probably should be more like
+			 *
+			 * Assert((is_prepared && (pid == 0)) | (!is_prepared && (pid != 0)));
+			 *
+			 * Also, we only handle prepared xacts now anyway.
+			 * */
 			if (is_prepared)
 				Assert(pid == 0);
+
+			/*
+			 * Some other decoding backend might have mark the process
+			 * as a leader before we acquired the lock. But it must not
+			 * be follower of some other leader.
+			 */
+			Assert((proc->decodeGroupLeader == NULL) ||
+				   (proc->decodeGroupLeader == proc));
+
 			/* recheck if someone else did not already assign us */
-			if (proc->decodeGroupLeader != proc)
+			if (proc->decodeGroupLeader == NULL)
 			{
-				/* We had better not be a follower. */
-				Assert(proc->decodeGroupLeader == NULL);
+				/*
+				 * XXX Why do we make the leader also a member? Doesn't
+				 * it just complicate the processing later?
+				 */
 				proc->decodeGroupLeader = proc;
 				dlist_push_head(&proc->decodeGroupMembers,
 								&proc->decodeGroupLink);
@@ -2023,9 +2094,8 @@ BecomeDecodeGroupMember(PGPROC *leader, int pid, bool is_prepared)
 }
 
 /*
- * Remove a decodeGroupMember from the decodeGroupMembership of
- * decodeGroupLeader
- * Acquire lock
+ * RemoveDecodeGroupMember
+ *		Remove a member from the decoding group of a leader.
  */
 void
 RemoveDecodeGroupMember(PGPROC *leader)
@@ -2041,9 +2111,10 @@ RemoveDecodeGroupMember(PGPROC *leader)
 }
 
 /*
- * Remove a decodeGroupMember from the decodeGroupMembership of
- * decodeGroupLeader
- * Assumes that the caller is holding appropriate lock
+ * RemoveDecodeGroupMemberLocked
+ *		Remove a member from a decoding group of a leader.
+ *
+ * Assumes that the caller is holding appropriate lock on PGPROC.
  */
 void
 RemoveDecodeGroupMemberLocked(PGPROC *leader)
@@ -2059,20 +2130,24 @@ RemoveDecodeGroupMemberLocked(PGPROC *leader)
 }
 
 /*
- * Indicate to all decodeGroupMembers that this transaction is
- * going away.
+ * LogicalDecodeRemoveTransaction
+ *		Notify all decoding members that this transaction is going away.
  *
- * Wait for all decodeGroupMembers to ack back before returning
- * from here but only in case of aborts.
+ * Wait for all decodeGroupMembers to ack back before returning from
+ * here but only in case of aborts.
  *
- * This function should be called *after* the proc has been
- * removed from the procArray.
+ * This function should be called *after* the proc has been removed
+ * from the procArray.
  *
- * If the transaction is committing, it's ok for the
- * decoders to continue merrily. When it tries to lock this
- * proc, it won't find it and check for transaction status
- * and cache the commit status for future calls in
- * LogicalLockTransaction
+ * If the transaction is committing, it's ok for the decoding backends
+ * to continue merrily - there is no danger in accessing catalogs. When
+ * it tries to join the decoding group, it won't find the proc anymore,
+ * forcing it to re-check transaction status and cache the commit
+ * status for future calls (see LogicalLockTransaction).
+ *
+ * XXX What happens when a decoding process joins a decoding group, but
+ * then dies/crashes before leaving it again? Won't it stay in the group
+ * forever, blocking the abort?
  */
 void
 LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit)
@@ -2085,10 +2160,14 @@ LogicalDecodeRemoveTransaction(PGPROC *leader, bool isCommit)
 
 	leader_lwlock = LockHashPartitionLockByProc(leader);
 	LWLockAcquire(leader_lwlock, LW_EXCLUSIVE);
-	/* mark ourself as aborting */
-	if (!isCommit)
-		leader->decodeAbortPending = true;
 
+	/* mark the transaction as aborting */
+	leader->decodeAbortPending = (!isCommit);
+
+	/*
+	 * If the proc has not been initialized as a group leader, there are
+	 * no group members to wait for and we can terminate right away.
+	 */
 	if (leader->decodeGroupLeader == NULL)
 	{
 		Assert(dlist_is_empty(&leader->decodeGroupMembers));
@@ -2102,18 +2181,27 @@ recheck:
 	Assert(!dlist_is_empty(&leader->decodeGroupMembers));
 	if (!isCommit)
 	{
+		/*
+		 * We need to walk the list of group members, and decide if we
+		 * need to wait for some of them. In other words, we need to
+		 * check if there are any processes besides the leader.
+		 */
 		dlist_foreach(iter, &leader->decodeGroupMembers)
 		{
 			proc = dlist_container(PGPROC, decodeGroupLink, iter.cur);
-			/* mark the proc to indicate abort is pending */
+
+			/* Ignore the leader (i.e. ourselves). */
 			if (proc == leader)
 				continue;
+
+			/* mark the proc to indicate abort is pending */
 			if (!proc->decodeAbortPending)
 			{
 				proc->decodeAbortPending = true;
 				elog(DEBUG1, "marking group member (%p) from (%p) for abort",
 					 proc, leader);
 			}
+
 			/* if the proc is currently locked, wait */
 			if (proc->decodeLocked)
 				do_wait = true;

Reply via email to