On Mon, Sep 26, 2022 at 8:41 AM [email protected]
<[email protected]> wrote:
>
> On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <[email protected]> wrote:
>
> > 3.
> > ApplyWorkerMain()
> > {
> > ...
> > ...
> > +
> > + if (server_version >= 160000 &&
> > + MySubscription->stream == SUBSTREAM_PARALLEL)
> > + options.proto.logical.streaming = pstrdup("parallel");
> >
> > After deciding here whether the parallel streaming mode is enabled or
> > not, we recheck the same thing in apply_handle_stream_abort() and
> > parallel_apply_can_start(). In parallel_apply_can_start(), we do it
> > via two different checks. How about storing this information say in
> > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at
> > other places?
>
> Improved as suggested.
> Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker.
>
Can we name the variable in_parallel_apply as parallel_apply and set
it in logicalrep_worker_launch() instead of in
ParallelApplyWorkerMain()?
Few other comments:
==================
1.
+ if (is_subworker &&
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+
+ ereport(DEBUG1,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("out of parallel apply workers"),
+ errhint("You might need to increase
max_parallel_apply_workers_per_subscription.")));
I think it is better to keep the level of this as LOG. Similar
messages at other places use WARNING or LOG. Here, I prefer LOG
because the system can still proceed without blocking anything.
2.
+/* Reset replication origin tracking. */
+void
+parallel_apply_replorigin_reset(void)
+{
+ bool started_tx = false;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
Why do we need a transaction in this function?
3. Few suggestions to improve in the patch:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 1623c9e2fa..d9c519dfab 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s)
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);
+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
parallel_apply_replorigin_reset();
/* Send STREAM PREPARE message to the parallel apply worker. */
@@ -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("STREAM ABORT message without STREAM STOP")));
- /*
- * Check whether the publisher sends abort_lsn and abort_time.
- *
- * Note that the parallel apply worker is only started when the publisher
- * sends abort_lsn and abort_time.
- */
+ /* We receive abort information only when we can apply in parallel. */
if (MyLogicalRepWorker->in_parallel_apply)
read_abort_info = true;
@@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s)
Assert(winfo);
if (subxid == xid)
+ {
+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
parallel_apply_replorigin_reset();
+ }
/* Send STREAM ABORT message to the parallel apply worker. */
parallel_apply_send_data(winfo, s->len, s->data);
@@ -1858,6 +1863,12 @@ apply_handle_stream_commit(StringInfo s)
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);
+ /*
+ * We need to reset the replication origin before sending the commit
+ * message and set it up again after confirming that parallel worker
+ * has processed the message. This is required because origin can be
+ * active only in one process at-a-time.
+ */
parallel_apply_replorigin_reset();
/* Send STREAM COMMIT message to the parallel apply worker. */
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 4cbfb43492..2bd9664f86 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -70,11 +70,7 @@ typedef struct LogicalRepWorker
*/
pid_t apply_leader_pid;
- /*
- * Indicates whether to use parallel apply workers.
- *
- * Determined based on streaming parameter and publisher version.
- */
+ /* Indicates whether apply can be performed parallelly. */
bool in_parallel_apply;
--
With Regards,
Amit Kapila.