On Mon, Sep 5, 2022 at 6:34 PM houzj.f...@fujitsu.com <houzj.f...@fujitsu.com> wrote: > > Attach the correct patch set this time. >
Few comments on v28-0001*: ======================= 1. + /* Whether the worker is processing a transaction. */ + bool in_use; I think this same comment applies to in_parallel_apply_xact flag as well. How about: "Indicates whether the worker is available to be used for parallel apply transaction?"? 2. + /* + * Set this flag in the leader instead of the parallel apply worker to + * avoid the race condition where the leader has already started waiting + * for the parallel apply worker to finish processing the transaction(set + * the in_parallel_apply_xact to false) while the child process has not yet + * processed the first STREAM_START and has not set the + * in_parallel_apply_xact to true. I think part of this comment "(set the in_parallel_apply_xact to false)" is not necessary. It will be clear without that. 3. + /* Create entry for requested transaction. */ + entry = hash_search(ParallelApplyWorkersHash, &xid, HASH_ENTER, &found); + if (found) + elog(ERROR, "hash table corrupted"); ... ... + hash_search(ParallelApplyWorkersHash, &xid, HASH_REMOVE, NULL); It is better to have a similar elog for HASH_REMOVE case as well. We normally seem to have such elog for HASH_REMOVE. 4. * Parallel apply is not supported when subscribing to a publisher which + * cannot provide the abort_time, abort_lsn and the column information used + * to verify the parallel apply safety. In this comment, which column information are you referring to? 5. + /* + * Set in_parallel_apply_xact to true again as we only aborted the + * subtransaction and the top transaction is still in progress. No + * need to lock here because currently only the apply leader are + * accessing this flag. + */ + winfo->shared->in_parallel_apply_xact = true; This theory sounds good to me but I think it is better to update/read this flag under spinlock as the patch is doing at a few other places. I think that will make the code easier to follow without worrying too much about such special cases. There are a few asserts as well which read this without lock, it would be better to change those as well. 6. + * LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version + * with support for streaming large transactions using parallel apply + * workers. Introduced in PG16. How about changing it to something like: "LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version where we support applying large streaming transactions in parallel. Introduced in PG16." 7. + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + bool write_abort_lsn = (data->protocol_version >= + LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM); /* * The abort should happen outside streaming block, even for streamed @@ -1856,7 +1859,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, Assert(rbtxn_is_streamed(toptxn)); OutputPluginPrepareWrite(ctx, true); - logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn, abort_lsn, + write_abort_lsn); I think we need to send additional information if the client has used the parallel streaming option. Also, let's keep sending subxid as we were doing previously and add additional parameters required. It may be better to name write_abort_lsn as abort_info. 8. + /* + * Check whether the publisher sends abort_lsn and abort_time. + * + * Note that the paralle apply worker is only started when the publisher + * sends abort_lsn and abort_time. + */ + if (am_parallel_apply_worker() || + walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000) + read_abort_lsn = true; + + logicalrep_read_stream_abort(s, &abort_data, read_abort_lsn); This check should match with the check for the write operation where we are checking the protocol version as well. There is a typo as well in the comments (/paralle/parallel). -- With Regards, Amit Kapila.