Here are my review comments for patch v55-0002 ======
.../replication/logical/applyparallelworker.c 1. pa_can_start @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid) /* * Don't start a new parallel worker if user has set skiplsn as it's * possible that user want to skip the streaming transaction. For - * streaming transaction, we need to spill the transaction to disk so that - * we can get the last LSN of the transaction to judge whether to skip - * before starting to apply the change. + * streaming transaction, we need to serialize the transaction to a file + * so that we can get the last LSN of the transaction to judge whether to + * skip before starting to apply the change. */ if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) return false; I think the wording change may belong in patch 0001 because it has nothing to do with partial serializing. ~~~ 2. pa_free_worker + /* + * Stop the worker if there are enough workers in the pool. + * + * XXX The worker is also stopped if the leader apply worker needed to + * serialize part of the transaction data due to a send timeout. This is + * because the message could be partially written to the queue due to send + * timeout and there is no way to clean the queue other than resending the + * message until it succeeds. To avoid complexity, we directly stop the + * worker in this case. + */ + if (winfo->serialize_changes || + napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) Don't need to say "due to send timeout" 2 times in 2 sentences. SUGGESTION XXX The worker is also stopped if the leader apply worker needed to serialize part of the transaction data due to a send timeout. This is because the message could be partially written to the queue but there is no way to clean the queue other than resending the message until it succeeds. Directly stopping the worker avoids needing this complexity. ~~~ 3. pa_spooled_messages Previously I suggested this function name should be changed but that was rejected (see [1] #6a) > 6a. > IMO a better name for this function would be > pa_apply_spooled_messages(); Not sure about this. ~ FYI the reason for the previous suggestion is because there is no verb in the current function name, so the reader is left thinking pa_spooled_messages "what"? It means the caller has to have extra comments like: /* Check if changes have been serialized to a file. */ pa_spooled_messages(); OTOH, if the function was called something better -- e.g. pa_check_for_spooled_messages() or similar -- then it would be self-explanatory. ~ 4. /* + * Replay the spooled messages in the parallel apply worker if the leader apply + * worker has finished serializing changes to the file. + */ +static void +pa_spooled_messages(void) I'm not 100% sure of the logic, so IMO maybe the comment should say a bit more about how this works: Specifically, let's say there was some timeout and the LA needed to write the spool file, then let's say the PA timed out and found itself inside this function. Now, let's say the LA is still busy writing the file -- so what happens next? Does this function simply return, then the main PA loop waits again, then the times out again, then PA finds itself back inside this function again... and that keeps happening over and over until eventually the spool file is found FS_READY? Some explanatory comments might help. ~ 5. + /* + * Check if changes have been serialized to a file. if so, read and apply + * them. + */ + SpinLockAcquire(&MyParallelShared->mutex); + fileset_state = MyParallelShared->fileset_state; + SpinLockRelease(&MyParallelShared->mutex); "if so" -> "If so" ~~~ 6. pa_send_data + * + * If the attempt to send data via shared memory times out, then we will switch + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent possible + * deadlocks with another parallel apply worker (refer to the comments atop + * applyparallelworker.c for details). This means that the current data and any + * subsequent data for this transaction will be serialized to a file. */ void pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) SUGGESTION (minor comment rearranging) If the attempt to send data via shared memory times out, then we will switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this means that the current data and any subsequent data for this transaction will be serialized to a file. This is done to prevent possible deadlocks with another parallel apply worker (refer to the comments atop applyparallelworker.c for details). ~ 7. + /* + * Take the stream lock to make sure that the parallel apply worker + * will wait for the leader to release the stream lock until the + * end of the transaction. + */ + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); The comment doesn't sound right. "until the end" -> "at the end" (??) ~~~ 8. pa_stream_abort @@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) RollbackToSavepoint(spname); CommitTransactionCommand(); subxactlist = list_truncate(subxactlist, i + 1); + break; } } Spurious whitespace unrelated to this patch? ====== src/backend/replication/logical/worker.c 9. handle_streamed_transaction /* + * The parallel apply worker needs the xid in this message to decide + * whether to define a savepoint, so save the original message that has not + * moved the cursor after the xid. We will serailize this message to a file + * in PARTIAL_SERIALIZE mode. + */ + original_msg = *s; "serailize" -> "serialize" ~~~ 10. apply_handle_stream_prepare @@ -1245,6 +1265,7 @@ apply_handle_stream_prepare(StringInfo s) LogicalRepPreparedTxnData prepare_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; Should this include a longer explanation of why this copy is needed (same as was done in handle_streamed_transaction)? ~ 11. case TRANS_PARALLEL_APPLY: + + /* + * Close the file before committing if the parallel apply worker + * is applying spooled messages. + */ + if (stream_fd) + stream_close_file(); 11a. This comment seems worded backwards. SUGGESTION If the parallel apply worker is applying spooled messages then close the file before committing. ~ 11b. I'm confused - isn't there code doing exactly this (close file before commit) already in the apply_handle_stream_commit TRANS_PARALLEL_APPLY? ~~~ 12. apply_handle_stream_start @@ -1383,6 +1493,7 @@ apply_handle_stream_start(StringInfo s) bool first_segment; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; Should this include a longer explanation of why this copy is needed (same as was done in handle_streamed_transaction)? ~ 13. + serialize_stream_start(stream_xid, false); + stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg); - end_replication_step(); break; A spurious blank line is left before the break; ~~~ 14. serialize_stream_stop + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); The comment seems redundant. The code says the same. ~~~ 15. apply_handle_stream_abort @@ -1676,6 +1794,7 @@ apply_handle_stream_abort(StringInfo s) LogicalRepStreamAbortData abort_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; bool toplevel_xact; Should this include a longer explanation of why this copy is needed (same as was done in handle_streamed_transaction)? ~~~ 16. apply_spooled_messages + stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false); Something still seems a bit odd about this to me (previously also mentioned in review [1] #29) but I cannot quite put my finger on it... AFAIK the 'stream_fd' is the global the LA is using to remember the single stream spool file; It corresponds to the LogicalRepWorker's 'stream_fileset'. So using that same global on the PA side somehow seemed strange to me. The fileset at PA comes from a different place (MyParallelShared->fileset). Basically, I felt that whenever use are using 'stream_fd' and 'stream_fileset' etc. then it should be safe to assume you are looking at the worker.c from the leader apply worker POV. Otherwise, IMO it should just use some fd/fs passed around as parameters. Sure, there might be a few places like stream_close_file (etc) which need some small refactoring to pass as a parameter instead of always using 'stream_fd' but IMO the end result will be tidier. ~ 17. + /* + * No need to output the DEBUG message here in the parallel apply + * worker as similar messages will be output when handling STREAM_STOP + * message. + */ + if (!am_parallel_apply_worker() && nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); Instead of saying what you are not doing ("No need to... in output apply worker") wouldn't it make more sense to reverse it and say what you are doing ("Only log DEBUG messages for the leader apply worker because ...") and then the condition also becomes positive: if (am_leader_apply_worker()) { ... } ~ 18. + if (am_parallel_apply_worker() && + MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED) + goto done; + + /* + * No need to output the DEBUG message here in the parallel apply + * worker as similar messages will be output when handling STREAM_STOP + * message. + */ + if (!am_parallel_apply_worker() && nchanges % 1000 == 0) elog(DEBUG1, "replayed %d changes from file \"%s\"", nchanges, path); } - BufFileClose(fd); - + stream_close_file(); pfree(buffer); pfree(s2.data); +done: elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); Shouldn't that "done:" label be *above* the pfree's. Otherwise, those are going to be skipped over by the "goto done;". ~~~ 19. apply_handle_stream_commit @@ -1898,6 +2072,7 @@ apply_handle_stream_commit(StringInfo s) LogicalRepCommitData commit_data; ParallelApplyWorkerInfo *winfo; TransApplyAction apply_action; + StringInfoData original_msg = *s; Should this include a longer explanation of why this copy is needed (same as was done in handle_streamed_transaction)? ~ 20. + /* + * Close the file before committing if the parallel apply worker + * is applying spooled messages. + */ + if (stream_fd) + stream_close_file(); (same as previous review comment - see #11) This comment seems worded backwards. SUGGESTION If the parallel apply worker is applying spooled messages then close the file before committing. ====== src/include/replication/worker_internal.h 21. PartialFileSetState + * State of fileset in leader apply worker. + * + * FS_BUSY means that the leader is serializing changes to the file. FS_READY + * means that the leader has serialized all changes to the file and the file is + * ready to be read by a parallel apply worker. + */ +typedef enum PartialFileSetState "ready to be read" sounded a bit strange. SUGGESTION ... to the file so it is now OK for a parallel apply worker to read it. ------ [1] Houz reply to my review v51-0002 -- https://www.postgresql.org/message-id/OS0PR01MB5716350729D8C67AA8CE333194129%40OS0PR01MB5716.jpnprd01.prod.outlook.com Kind Regards, Peter Smith. Fujitsu Australia