On Wed, Nov 30, 2022 at 4:23 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > 2. > + /* > + * The stream lock is released when processing changes in a > + * streaming block, so the leader needs to acquire the lock here > + * before entering PARTIAL_SERIALIZE mode to ensure that the > + * parallel apply worker will wait for the leader to release the > + * stream lock. > + */ > + if (in_streamed_transaction && > + action != LOGICAL_REP_MSG_STREAM_STOP) > + { > + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); > > This comment is not completely correct because we can even acquire the > lock for the very streaming chunk. This check will work but doesn't > appear future-proof or at least not very easy to understand though I > don't have a better suggestion at this stage. Can we think of a better > check here? >
One idea is that we acquire this lock every time and callers like stream_commit are responsible to release it. Also, we can handle the close of stream file in the respective callers. I think that will make this part of the patch easier to follow. Some other comments: ===================== 1. The handling of buffile inside pa_stream_abort() looks bit ugly to me. I think you primarily required it because the buffile opened by parallel apply worker is in CurrentResourceOwner. Can we think of having a new resource owner to apply spooled messages? I think that will avoid the need to have a special purpose code to handle buffiles in parallel apply worker. 2. @@ -564,6 +571,7 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransactionId current_xid; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg; apply_action = get_transaction_apply_action(stream_xid, &winfo); @@ -573,6 +581,8 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) Assert(TransactionIdIsValid(stream_xid)); + original_msg = *s; + /* * We should have received XID of the subxact as the first part of the * message, so extract it. @@ -596,10 +606,14 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) stream_write_change(action, s); return true; + case TRANS_LEADER_PARTIAL_SERIALIZE: case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); - pa_send_data(winfo, s->len, s->data); + if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL) + pa_send_data(winfo, s->len, s->data); + else + stream_write_change(action, &original_msg); Please add the comment to specify the reason to remember the original string. 3. @@ -1797,8 +1907,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); + stream_xid = xid; Why do we need stream_xid here? I think we can avoid having global stream_fd if the comment #1 is feasible. 4. + * TRANS_LEADER_APPLY: + * The action means that we /The/This. Please make a similar change for other actions. 5. Apart from the above, please find a few changes to the comments for 0001 and 0002 patches in the attached patches. -- With Regards, Amit Kapila.
changes_amit_v54_0001.patch
Description: Binary data
changes_amit_v54_0002.patch
Description: Binary data