Dear Wang,
Thanks for updating the patch! Followings are comments for v33-0001.
===
libpqwalreceiver.c
01. inclusion
```
+#include "catalog/pg_subscription.h"
```
We don't have to include it because the analysis of parameters is done at
caller.
===
launcher.c
02. logicalrep_worker_launch()
```
+ /*
+ * Return silently if the number of parallel apply workers reached the
+ * limit per subscription.
+ */
+ if (is_subworker && nparallelapplyworkers >=
max_parallel_apply_workers_per_subscription)
```
a.
I felt that it might be kind if we output some debug messages.
b.
The if statement seems to be more than 80 characters. You can move to new line
around "nparallelapplyworkers >= ...".
===
applyparallelworker.c
03. declaration
```
+/*
+ * Is there a message pending in parallel apply worker which we need to
+ * receive?
+ */
+volatile bool ParallelApplyMessagePending = false;
```
I checked other flags that are set by signal handlers, their datatype seemed to
be sig_atomic_t.
Is there any reasons that you use normal bool? It should be changed if not.
04. HandleParallelApplyMessages()
```
+ if (winfo->error_mq_handle == NULL)
+ continue;
```
a.
I was not sure when the cell should be cleaned. Currently we clean up
ParallelApplyWorkersList() only in the parallel_apply_start_worker(),
but we have chances to remove such a cell like HandleParallelApplyMessages() or
HandleParallelApplyMessage(). How do you think?
b.
Comments should be added even if we keep this, like "exited worker, skipped".
```
+ else
+ ereport(ERROR,
+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to the leader
apply worker")));
```
c.
This function is called on the leader apply worker, so the hint should be "lost
connection to the parallel apply worker".
05. parallel_apply_setup_worker()
``
+ if (launched)
+ {
+ ParallelApplyWorkersList = lappend(ParallelApplyWorkersList,
winfo);
+ }
```
{} should be removed.
06. parallel_apply_wait_for_xact_finish()
```
+ /* If any workers have died, we have failed. */
```
This function checked only about a parallel apply worker, so the comment should
be "if worker has..."?
===
worker.c
07. handle_streamed_transaction()
```
+ * For non-streamed transactions, returns false;
```
"returns false;" -> "returns false"
apply_handle_commit_prepared(), apply_handle_abort_prepared()
These functions are not expected that parallel worker calls
so I think Assert() should be added.
08. UpdateWorkerStats()
```
-static void
+void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
```
This function is called only in worker.c, should be static.
09. subscription_change_cb()
```
-static void
+void
subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
```
This function is called only in worker.c, should be static.
10. InitializeApplyWorker()
```
+/*
+ * Initialize the database connection, in-memory subscription and necessary
+ * config options.
+ */
void
-ApplyWorkerMain(Datum main_arg)
+InitializeApplyWorker(void)
```
Some comments should be added about this is a common part of leader and
parallel apply worker.
===
logicalrepworker.h
11. declaration
```
extern PGDLLIMPORT volatile bool ParallelApplyMessagePending;
```
Please refer above comment.
===
guc_tables.c
12. ConfigureNamesInt
```
+ {
+ {"max_parallel_apply_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel apply workers
per subscription."),
+ NULL,
+ },
+ &max_parallel_apply_workers_per_subscription,
+ 2, 0, MAX_BACKENDS,
+ NULL, NULL, NULL
+ },
```
This parameter can be changed by pg_ctl reload, so the following corner case
may be occurred.
Should we add a assign hook to handle this? Or, can we ignore it?
1. set max_parallel_apply_workers_per_subscription to 4.
2. start replicating two streaming transactions.
3. commit transactions
=== Two parallel workers will be remained ===
4. change max_parallel_apply_workers_per_subscription to 3
5. We expected that only one worker remains, but two parallel workers remained.
It will be not stopped until another streamed transaction is started and
committed.
Best Regards,
Hayato Kuroda
FUJITSU LIMITED