Dear Hou,

Thanks for making the patch. Followings are my comments for v54-0003 and 0004.

0003

pa_free_worker()

+       /* Unlink any files that were needed to serialize partial changes. */
+       if (winfo->serialize_changes)
+               stream_cleanup_files(MyLogicalRepWorker->subid, 
winfo->shared->xid);
+

I think this part is not needed, because the LA cannot reach here if 
winfo->serialize_changes is true. Moreover stream_cleanup_files() is done in 
pa_free_worker_info().

LogicalParallelApplyLoop()

The parallel apply worker wakes up every 0.1s even if we are in the 
PARTIAL_SERIALIZE mode. Do you have idea to reduce that?

```
+                       pa_spooled_messages();
```

Comments are needed here, like "Changes may be serialize...".

pa_stream_abort()

```
+                               /*
+                                * Reopen the file and set the file position to 
the saved
+                                * position.
+                                */
+                               if (reopen_stream_fd)
+                               {
+                                       char            path[MAXPGPATH];
+
+                                       changes_filename(path, 
MyLogicalRepWorker->subid, xid);
+                                       stream_fd = 
BufFileOpenFileSet(&MyParallelShared->fileset,
+                                                                               
                   path, O_RDONLY, false);
+                                       BufFileSeek(stream_fd, fileno, offset, 
SEEK_SET);
+                               }
```

MyParallelShared->serialize_changes may be used instead of reopen_stream_fd.


worker.c

```
-#include "storage/buffile.h"
```

I think this include should not be removed.


handle_streamed_transaction()

```
+                       if (apply_action == TRANS_LEADER_SEND_TO_PARALLEL)
+                               pa_send_data(winfo, s->len, s->data);
+                       else
+                               stream_write_change(action, &original_msg);
```

Comments are needed here, 0001 has that bu removed in 0002.
There are some similar lines.


```
+                       /*
+                        * It is possible that while sending this change to 
parallel apply
+                        * worker we need to switch to serialize mode.
+                        */
+                       if (winfo->serialize_changes)
+                               pa_set_fileset_state(winfo->shared, FS_READY);
```

There are three same parts in the code, can we combine them to common part?

apply_spooled_messages()

```
+               /*
+                * Break the loop if the parallel apply worker has finished 
applying
+                * the transaction. The parallel apply worker should have 
closed the
+                * file before committing.
+                */
+               if (am_parallel_apply_worker() &&
+                       MyParallelShared->xact_state == PARALLEL_TRANS_FINISHED)
+                       goto done;
```

I thnk pfree(buffer) and pfree(s2.data) should not be skippied.
And this part should be at below "nchanges++;"


0004

set_subscription_retry()

```
+       LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
+                                        AccessShareLock);
+
```

I think AccessExclusiveLock should be aquired instead of AccessShareLock.
In AlterSubscription(), LockSharedObject(AccessExclusiveLock) seems to be used.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Reply via email to