Dear Hou,

The followings are my comments. I want to consider the patch more, but I sent 
it once.

===
worker.c

01. typedef enum TransApplyAction

```
/*
 * What action to take for the transaction.
 *
 * TRANS_LEADER_APPLY means that we are in the leader apply worker and changes
 * of the transaction are applied directly in the worker.
 *
 * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or table
 * sync worker. Changes are written to temporary files and then applied when
 * the final commit arrives.
 *
 * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply worker
 * and need to send the changes to the parallel apply worker.
 *
 * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and
 * changes of the transaction are applied directly in the worker.
 */
```

TRANS_LEADER_PARTIAL_SERIALIZE should be listed in.

02. handle_streamed_transaction()

```
+       StringInfoData  origin_msg;
...
+       origin_msg = *s;
...
+                               /* Write the change to the current file */
+                               stream_write_change(action,
+                                                                       
apply_action == TRANS_LEADER_SERIALIZE ?
+                                                                       s : 
&origin_msg);
```

I'm not sure why origin_msg is needed. Can we remove the conditional operator?


03. apply_handle_stream_start()

```
+ * XXX We can avoid sending pairs of the START/STOP messages to the parallel
+ * worker because unlike apply worker it will process only one transaction at a
+ * time. However, it is not clear whether any optimization is worthwhile
+ * because these messages are sent only when the logical_decoding_work_mem
+ * threshold is exceeded.
```

This comment should be modified because PA must acquire and release locks at 
that time.


04. apply_handle_stream_prepare()

```
+                       /*
+                        * After sending the data to the parallel apply worker, 
wait for
+                        * that worker to finish. This is necessary to maintain 
commit
+                        * order which avoids failures due to transaction 
dependencies and
+                        * deadlocks.
+                        */
+                       parallel_apply_wait_for_xact_finish(winfo->shared);
```

Here seems not to be correct. LA may not send data but spill changes to file.

05. apply_handle_stream_commit()

```
+                       if (apply_action == TRANS_LEADER_PARTIAL_SERIALIZE)
+                               stream_cleanup_files(MyLogicalRepWorker->subid, 
xid);
```

I'm not sure whether the stream files should be removed by LA or PAs. Could you 
tell me the reason why you choose LA?

===
applyparallelworker.c

05. parallel_apply_can_start()

```
+       if (switching_to_serialize)
+               return false;
```

Could you add a comment like:
Don't start a new parallel apply worker if the leader apply worker has been 
spilling changes to the disk temporarily.

06. parallel_apply_start_worker()

```
+       /*
+        * Set the xact_state flag in the leader instead of the
+        * parallel apply worker to avoid the race condition where the leader 
has
+        * already started waiting for the parallel apply worker to finish
+        * processing the transaction while the child process has not yet
+        * processed the first STREAM_START and has not set the
+        * xact_state to true.
+        */
```

I thinkg the word "flag" should be used for boolean, so the comment should be 
modified.
(There are so many such code-comments, all of them should be modified.)


07. parallel_apply_get_unique_id()

```
+/*
+ * Returns the unique id among all parallel apply workers in the subscriber.
+ */
+static uint16
+parallel_apply_get_unique_id()
```

I think this function is inefficient: the computational complexity will be 
increased linearly when the number of PAs is increased. I think the Bitmapset 
data structure may be used.

08. parallel_apply_send_data()

```
#define CHANGES_THRESHOLD       1000
#define SHM_SEND_TIMEOUT_MS     10000
```

I think the timeout may be too long. Could you tell me the background about it?


09. parallel_apply_send_data()

```
                        /*
                         * Close the stream file if not in a streaming block, 
the file will
                         * be reopened later.
                         */
                        if (!stream_apply_worker)
                                serialize_stream_stop(winfo->shared->xid);
```

a.
IIUC the timings when LA tries to send data but stream_apply_worker is NULL are:
* apply_handle_stream_prepare, 
* apply_handle_stream_start, 
* apply_handle_stream_abort, and
* apply_handle_stream_commit.
And at that time the state of TransApplyAction may be 
TRANS_LEADER_SEND_TO_PARALLEL. When should be close the file?

b.
Even if this is needed, I think the name of the called function should be 
modified. Here LA may not handle STREAM_STOP message. close_stream_file() or 
something?


10. parallel_apply_send_data()

```
                        /* Initialize the stream fileset. */
                        serialize_stream_start(winfo->shared->xid, true);
```

I think the name of the called function should be modified. Here LA may not 
handle STREAM_START message. open_stream_file() or something?

11. parallel_apply_send_data()

```
                if (++retry >= CHANGES_THRESHOLD)
                {
                        MemoryContext oldcontext;
                        StringInfoData msg;
...
                        initStringInfo(&msg);
                        appendBinaryStringInfo(&msg, data, nbytes);
...
                        switching_to_serialize = true;
                        apply_dispatch(&msg);
                        switching_to_serialize = false;

                        break;
                }
```

pfree(msg.data) may be needed.

===
12. worker_internal.h

```
+       pg_atomic_uint32        left_message;
```


ParallelApplyWorkerShared has been already controlled by mutex locks.  Why did 
you add an atomic variable to the data structure?

===
13. typedefs.list

ParallelTransState should be added.

===
14. General

I have already said old about it directly, but I point it out to notify other 
members again.
I have caused a deadlock with two PAs. Indeed it could be solved by the lmgr, 
but the output seemed not to be kind. Followings were copied from the log and 
we could see that commands executed by apply workers were not output. Can we 
extend it, or is it the out of scope?


```
2022-11-07 11:11:27.449 UTC [11262] ERROR:  deadlock detected
2022-11-07 11:11:27.449 UTC [11262] DETAIL:  Process 11262 waits for 
AccessExclusiveLock on object 16393 of class 6100 of database 0; blocked by 
process 11320.
        Process 11320 waits for ShareLock on transaction 742; blocked by 
process 11266.
        Process 11266 waits for AccessShareLock on object 16393 of class 6100 
of database 0; blocked by process 11262.
        Process 11262: <command string not enabled>
        Process 11320: <command string not enabled>
        Process 11266: <command string not enabled>
```


Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Reply via email to