On Wed, Dec 7, 2022 at 8:28 AM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > Besides, I fixed a bug where there could still be messages left in memory > queue and the PA has started to apply spooled message. >
Few comments on the recent changes in the patch: ======================================== 1. It seems you need to set FS_SERIALIZE_DONE in stream_prepare/commit/abort. They are still directly setting the state as READY. Am, I missing something or you forgot to change it? 2. case TRANS_PARALLEL_APPLY: pa_stream_abort(&abort_data); + /* + * Reset the stream_fd after aborting the toplevel transaction in + * case the parallel apply worker is applying spooled messages + */ + if (toplevel_xact) + stream_fd = NULL; I think we can keep the handling of stream file the same in abort/commit/prepare code path. 3. It is already pointed out by Peter that it is better to add some comments in pa_spooled_messages() function that we won't be immediately able to apply changes after the lock is released, it will be done in the next cycle. 4. Shall we rename FS_SERIALIZE as FS_SERIALIZE_IN_PROGRESS? That will appear consistent with FS_SERIALIZE_DONE. 5. Comment improvements: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b26d587ae4..921d973863 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1934,8 +1934,7 @@ apply_handle_stream_abort(StringInfo s) } /* - * Check if the passed fileno and offset are the last fileno and position of - * the fileset, and report an ERROR if not. + * Ensure that the passed location is fileset's end. */ static void ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno, @@ -2084,9 +2083,9 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, nchanges++; /* - * Break the loop if stream_fd is set to NULL which means the parallel - * apply worker has finished applying the transaction. The parallel - * apply worker should have closed the file before committing. + * It is possible the file has been closed because we have processed + * some transaction end message like stream_commit in which case that + * must be the last message. */ -- With Regards, Amit Kapila.