Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2024-01-10 Thread vignesh C
On Wed, 10 Jan 2024 at 15:04, Amit Kapila  wrote:
>
> On Wed, Jan 10, 2024 at 2:59 PM Shlok Kyal  wrote:
> >
> > This patch is not applying on the HEAD. Please rebase and share the
> > updated patch.
> >
>
> IIRC, there were some regressions observed with this patch. So, one
> needs to analyze those as well. I think we should mark it as "Returned
> with feedback".

Thanks, I have updated the status to "Returned with feedback".
Feel free to post an updated version with the fix for the regression
and start a new entry for the same.

Regards,
Vignesh




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2024-01-10 Thread Amit Kapila
On Wed, Jan 10, 2024 at 2:59 PM Shlok Kyal  wrote:
>
> This patch is not applying on the HEAD. Please rebase and share the
> updated patch.
>

IIRC, there were some regressions observed with this patch. So, one
needs to analyze those as well. I think we should mark it as "Returned
with feedback".

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2024-01-10 Thread Shlok Kyal
Hi,

This patch is not applying on the HEAD. Please rebase and share the
updated patch.

Thanks and Regards
Shlok Kyal

On Wed, 10 Jan 2024 at 14:55, Peter Smith  wrote:
>
> Oops - now with attachments
>
> On Mon, Aug 21, 2023 at 5:56 PM Peter Smith  wrote:
>>
>> Hi Melih,
>>
>> Last week we revisited your implementation of design#2. Vignesh rebased it, 
>> and then made a few other changes.
>>
>> PSA v28*
>>
>> The patch changes include:
>> * changed the logic slightly by setting recv_immediately(new variable), if 
>> this variable is set the main apply worker loop will not wait in this case.
>> * setting the relation state to ready immediately if there are no more 
>> incremental changes to be synced.
>> * receive the incremental changes if applicable and set the relation state 
>> to ready without waiting.
>> * reuse the worker if the worker is free before trying to start a new table 
>> sync worker
>> * restarting the tablesync worker only after wal_retrieve_retry_interval
>>
>> ~
>>
>> FWIW, we just wanted to share with you the performance measurements seen 
>> using this design#2 patch set:
>>
>> ==
>>
>> RESULTS (not busy tests)
>>
>> --
>> 10 empty tables
>> 2w  4w  8w  16w
>> HEAD:   125 119 140 133
>> HEAD+v28*:  92  93  123 134
>> %improvement:   27% 22% 12% -1%
>> --
>> 100 empty tables
>> 2w  4w  8w  16w
>> HEAD:   1037843 11091155
>> HEAD+v28*:  591 625 26162569
>> %improvement:   43% 26% -136%   -122%
>> --
>> 1000 empty tables
>> 2w  4w  8w  16w
>> HEAD:   15874   10047   991910338
>> HEAD+v28*:  33673   12199   90949896
>> %improvement:   -112%   -21%8%  4%
>> --
>> 2000 empty tables
>> 2w  4w  8w  16w
>> HEAD:   45266   24216   19395   19820
>> HEAD+v28*:  88043   21550   21668   22607
>> %improvement:  -95% 11% -12%-14%
>>
>> ~~~
>>
>> Note - the results were varying quite a lot in comparison to the HEAD
>> e.g. HEAD results are very consistent, but the v28* results observed are not
>> HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740, 
>> 15908, 15940
>> v28* 1000 (2w):  34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163, 
>> 34058, 11969
>>
>> --
>> Kind Regards,
>> Peter Smith.
>> Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-21 Thread Peter Smith
Oops - now with attachments

On Mon, Aug 21, 2023 at 5:56 PM Peter Smith  wrote:

> Hi Melih,
>
> Last week we revisited your implementation of design#2. Vignesh rebased
> it, and then made a few other changes.
>
> PSA v28*
>
> The patch changes include:
> * changed the logic slightly by setting recv_immediately(new variable), if
> this variable is set the main apply worker loop will not wait in this case.
> * setting the relation state to ready immediately if there are no more
> incremental changes to be synced.
> * receive the incremental changes if applicable and set the relation state
> to ready without waiting.
> * reuse the worker if the worker is free before trying to start a new
> table sync worker
> * restarting the tablesync worker only after wal_retrieve_retry_interval
>
> ~
>
> FWIW, we just wanted to share with you the performance measurements seen
> using this design#2 patch set:
>
> ==
>
> RESULTS (not busy tests)
>
> --
> 10 empty tables
> 2w  4w  8w  16w
> HEAD:   125 119 140 133
> HEAD+v28*:  92  93  123 134
> %improvement:   27% 22% 12% -1%
> --
> 100 empty tables
> 2w  4w  8w  16w
> HEAD:   1037843 11091155
> HEAD+v28*:  591 625 26162569
> %improvement:   43% 26% -136%   -122%
> --
> 1000 empty tables
> 2w  4w  8w  16w
> HEAD:   15874   10047   991910338
> HEAD+v28*:  33673   12199   90949896
> %improvement:   -112%   -21%8%  4%
> --
> 2000 empty tables
> 2w  4w  8w  16w
> HEAD:   45266   24216   19395   19820
> HEAD+v28*:  88043   21550   21668   22607
> %improvement:  -95% 11% -12%-14%
>
> ~~~
>
> Note - the results were varying quite a lot in comparison to the HEAD
> e.g. HEAD results are very consistent, but the v28* results observed are
> not
> HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740,
> 15908, 15940
> v28* 1000 (2w):  34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163,
> 34058, 11969
>
> --
> Kind Regards,
> Peter Smith.
> Fujitsu Australia
>


v28-0001-Reuse-Tablesync-Workers.patch
Description: Binary data


v28-0002-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


v28-0004-Defect-fixes.patch
Description: Binary data


v28-0003-apply-worker-assigns-tables.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-21 Thread Peter Smith
Hi Melih,

Last week we revisited your implementation of design#2. Vignesh rebased it,
and then made a few other changes.

PSA v28*

The patch changes include:
* changed the logic slightly by setting recv_immediately(new variable), if
this variable is set the main apply worker loop will not wait in this case.
* setting the relation state to ready immediately if there are no more
incremental changes to be synced.
* receive the incremental changes if applicable and set the relation state
to ready without waiting.
* reuse the worker if the worker is free before trying to start a new table
sync worker
* restarting the tablesync worker only after wal_retrieve_retry_interval

~

FWIW, we just wanted to share with you the performance measurements seen
using this design#2 patch set:

==

RESULTS (not busy tests)

--
10 empty tables
2w  4w  8w  16w
HEAD:   125 119 140 133
HEAD+v28*:  92  93  123 134
%improvement:   27% 22% 12% -1%
--
100 empty tables
2w  4w  8w  16w
HEAD:   1037843 11091155
HEAD+v28*:  591 625 26162569
%improvement:   43% 26% -136%   -122%
--
1000 empty tables
2w  4w  8w  16w
HEAD:   15874   10047   991910338
HEAD+v28*:  33673   12199   90949896
%improvement:   -112%   -21%8%  4%
--
2000 empty tables
2w  4w  8w  16w
HEAD:   45266   24216   19395   19820
HEAD+v28*:  88043   21550   21668   22607
%improvement:  -95% 11% -12%-14%

~~~

Note - the results were varying quite a lot in comparison to the HEAD
e.g. HEAD results are very consistent, but the v28* results observed are not
HEAD 1000 (2w): 15861, 15777, 16007, 15950, 15886, 15740, 15846, 15740,
15908, 15940
v28* 1000 (2w):  34214, 13679, 8792, 33289, 31976, 56071, 57042, 56163,
34058, 11969

--
Kind Regards,
Peter Smith.
Fujitsu Australia


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-15 Thread Peter Smith
On Fri, Aug 11, 2023 at 11:45 PM Melih Mutlu  wrote:
>
> Again, I couldn't reproduce the cases where you saw significantly degraded 
> performance. I wonder if I'm missing something. Did you do anything not 
> included in the test scripts you shared? Do you think v26-0001 will perform 
> 84% worse than HEAD, if you try again? I just want to be sure that it was not 
> a random thing.
> Interestingly, I also don't see an improvement in above results as big as in 
> your results when inserts/tx ratio is smaller. Even though it certainly is 
> improved in such cases.
>

TEST ENVIRONMENTS

I am running the tests on a high-spec machine:

-- NOTE: Nobody else is using this machine during our testing, so
there are no unexpected influences messing up the results.


Linix

Architecture:  x86_64
CPU(s):120
Thread(s) per core:2
Core(s) per socket:15

  totalusedfree  shared  buff/cache   available
Mem:   755G5.7G737G 49M 12G748G
Swap:  4.0G  0B4.0G

~~~

The results I am seeing are not random. HEAD+v26-0001 is consistently
worse than HEAD but only for some settings. With these settings, I see
bad results (i.e. worse than HEAD) consistently every time using the
dedicated test machine.

Hou-san also reproduced bad results using a different high-spec machine

Vignesh also reproduced bad results using just his laptop but in his
case, it did *not* occur every time. As discussed elsewhere the
problem is timing-related, so sometimes you may be lucky and sometimes
not.

~

I expect you are running everything correctly, but if you are using
just a laptop (like Vignesh) then like him you might need to try
multiple times before you can hit the problem happening in your
environment.

Anyway, in case there is some other reason you are not seeing the bad
results I have re-attached scripts and re-described every step below.

==

BUILDING

-- NOTE: I have a very minimal configuration without any
optimization/debug flags etc. See config.log

$ ./configure --prefix=/home/peter/pg_oss

-- NOTE: Of course, make sure to be running using the correct Postgres:

echo 'set environment variables for OSS work'
export PATH=/home/peter/pg_oss/bin:$PATH

-- NOTE: Be sure to do git stash or whatever so don't accidentally
build a patched version thinking it is the HEAD version
-- NOTE: Be sure to do a full clean build and apply (or don't apply
v26-0001) according to the test you wish to run.

STEPS
1. sudo make clean
2. make
3. sudo make install

==

SCRIPTS & STEPS

SCRIPTS
testrun.sh
do_one_test_setup.sh
do_one_test_PUB.sh
do_one_test_SUB.sh

---

STEPS

Step-1. Edit the testrun.sh

tables=( 100 )
workers=( 2 4 8 16 )
size="0"
prefix="0816headbusy" <-- edit to differentiate each test run

~

Step-2. Edit the do_one_test_PUB.sh
IF commit_counter = 1000 THEN <-- edit this if needed. I wanted 1000
inserts/tx so nothing to do

~

Step-3: Check nothing else is running. If yes, then clean it up
[peter@localhost testing_busy]$ ps -eaf | grep postgres
peter111924 100103  0 19:31 pts/000:00:00 grep --color=auto postgres

~

Step-4: Run the tests
[peter@localhost testing_busy]$ ./testrun.sh
num_tables=100, size=0, num_workers=2, run #1 <-- check the echo
matched the config you set in the Setp-1
waiting for server to shut down done
server stopped
waiting for server to shut down done
server stopped
num_tables=100, size=0, num_workers=2, run #2
waiting for server to shut down done
server stopped
waiting for server to shut down done
server stopped
num_tables=100, size=0, num_workers=2, run #3
...

~

Step-5: Sanity check
When the test completes the current folder will be full of .log and .dat* files.
Check for sanity that no errors happened

[peter@localhost testing_busy]$ cat *.log | grep ERROR
[peter@localhost testing_busy]$

~

Step-6: Collect the results
The results are output (by the do_one_test_SUB.sh) into the *.dat_SUB files
Use grep to extract them

[peter@localhost testing_busy]$ cat 0816headbusy_100t_0_2w_*.dat_SUB |
grep RESULT | grep -v duration | awk '{print $3}'
11742.019
12157.355
11773.807
11582.981
12220.962
12546.325
12210.713
12614.892
12015.489
13527.05

Repeat grep for other files:
$ cat 0816headbusy_100t_0_4w_*.dat_SUB | grep RESULT | grep -v
duration | awk '{print $3}'
$ cat 0816headbusy_100t_0_8w_*.dat_SUB | grep RESULT | grep -v
duration | awk '{print $3}'
$ cat 0816headbusy_100t_0_16w_*.dat_SUB | grep RESULT | grep -v
duration | awk '{print $3}'

~

Step-7: Summarise the results
Now I just cut/paste the results from Step-6 into a spreadsheet and
report the median of the runs.

For example, for the above HEAD run, it was:
 2w4w   8w  16w
1   11742   5996   1919   1582
2   12157   5960   1871   1469
3   11774   5926   2101   1571
4   11583   6155   1883   1671
5   12221   6310   1895   1707
6   

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-15 Thread Peter Smith
Here is another review comment about patch v26-0001.

The tablesync worker processes include the 'relid' as part of their
name. See launcher.c:

snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
relid);

~~

And if that worker is "reused" by v26-0001 to process another relation
there is a LOG

if (reuse_worker)
ereport(LOG,
errmsg("logical replication table synchronization worker for
subscription \"%s\" will be reused to sync table \"%s\" with relid
%u.",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid),
MyLogicalRepWorker->relid));


AFAICT, when being "reused" the original process name remains
unchanged, and so I think it will continue to appear to any user
looking at it that the tablesync process is just taking a very long
time handling the original 'relid'.

Won't the stale process name cause confusion to the users?

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-14 Thread vignesh C
On Thu, 10 Aug 2023 at 10:16, Amit Kapila  wrote:
>
> On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu)
>  wrote:
> >
> > On Thursday, August 3, 2023 7:30 PM Melih Mutlu   
> > wrote:
> >
> > > Right. I attached the v26 as you asked.
> >
> > Thanks for posting the patches.
> >
> > While reviewing the patch, I noticed one rare case that it's possible that 
> > there
> > are two table sync worker for the same table in the same time.
> >
> > The patch relies on LogicalRepWorkerLock to prevent concurrent access, but 
> > the
> > apply worker will start a new worker after releasing the lock. So, at the 
> > point[1]
> > where the lock is released and the new table sync worker has not been 
> > started,
> > it seems possible that another old table sync worker will be reused for the
> > same table.
> >
> > /* Now safe to release the LWLock */
> > LWLockRelease(LogicalRepWorkerLock);
> > *[1]
> > /*
> >  * If there are free sync worker slot(s), 
> > start a new sync
> >  * worker for the table.
> >  */
> > if (nsyncworkers < 
> > max_sync_workers_per_subscription)
> > ...
> > 
> > logicalrep_worker_launch(MyLogicalRepWorker->dbid,
> >
>
> Yeah, this is a problem. I think one idea to solve this is by
> extending the lock duration till we launch the tablesync worker but we
> should also consider changing this locking scheme such that there is a
> better way to indicate that for a particular rel, tablesync is in
> progress. Currently, the code in TablesyncWorkerMain() also acquires
> the lock in exclusive mode even though the tablesync for a rel is in
> progress which I guess could easily heart us for larger values of
> max_logical_replication_workers. So, that could be another motivation
> to think for a different locking scheme.

There are couple of ways in which this issue can be solved:
Approach #1) check that the reuse worker has not picked up this table
for table sync from logicalrep_worker_launch while holding a lock on
LogicalRepWorkerLock, if the reuse worker has already picked it up for
processing, simply ignore it and return, nothing has to be done by the
launcher in this case.
Approach #2) a) Applyworker to create a shared memory of all the
relations that need to be synced, b) tablesync worker to take a lock
on this shared memory and pick the next table to be
processed(tablesync worker need not get the subscription relations
again and again) c) tablesync worker to update the status in shared
memory for the relation(since the lock is held there will be no
concurrency issues), also mark the start time in the shared memory,
this will help in not to restart the failed table before
wal_retrieve_retry_interval has expired d) tablesync worker to sync
the table e) subscription relation will be marked as ready and the
tablesync worker to remove the entry from shared memory f) Applyworker
will periodically synchronize the shared memory relations to keep it
in sync with the fetched subscription relation tables  g) when apply
worker exits, the shared memory will be cleared.

Approach #2) will also help in solving the other issue reported by Amit at [1].
I felt we can use Approach #2 to solve the problem as it solves both
the reported issues and also there is an added advantage where the
re-use table sync worker need not scan the pg_subscription_rel to get
the non-ready table for every run, instead we can use the list
prepared by apply worker.
Thoughts?

[1] - 
https://www.postgresql.org/message-id/CAA4eK1KyHfVOVeio28p8CHDnuyKuej78cj_7U9mHAB4ictVQwQ%40mail.gmail.com

Regards,
Vignesh




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-14 Thread Amit Kapila
On Thu, Aug 10, 2023 at 10:15 AM Amit Kapila  wrote:
>
> On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu)
>  wrote:
> >
> > On Thursday, August 3, 2023 7:30 PM Melih Mutlu   
> > wrote:
> >
> > > Right. I attached the v26 as you asked.
> >
> > Thanks for posting the patches.
> >
> > While reviewing the patch, I noticed one rare case that it's possible that 
> > there
> > are two table sync worker for the same table in the same time.
> >
> > The patch relies on LogicalRepWorkerLock to prevent concurrent access, but 
> > the
> > apply worker will start a new worker after releasing the lock. So, at the 
> > point[1]
> > where the lock is released and the new table sync worker has not been 
> > started,
> > it seems possible that another old table sync worker will be reused for the
> > same table.
> >
> > /* Now safe to release the LWLock */
> > LWLockRelease(LogicalRepWorkerLock);
> > *[1]
> > /*
> >  * If there are free sync worker slot(s), 
> > start a new sync
> >  * worker for the table.
> >  */
> > if (nsyncworkers < 
> > max_sync_workers_per_subscription)
> > ...
> > 
> > logicalrep_worker_launch(MyLogicalRepWorker->dbid,
> >
>
> Yeah, this is a problem. I think one idea to solve this is by
> extending the lock duration till we launch the tablesync worker but we
> should also consider changing this locking scheme such that there is a
> better way to indicate that for a particular rel, tablesync is in
> progress. Currently, the code in TablesyncWorkerMain() also acquires
> the lock in exclusive mode even though the tablesync for a rel is in
> progress which I guess could easily heart us for larger values of
> max_logical_replication_workers. So, that could be another motivation
> to think for a different locking scheme.
>

Yet another problem is that currently apply worker maintains a hash
table for 'last_start_times' to avoid restarting the tablesync worker
immediately upon error. The same functionality is missing while
reusing the table sync worker. One possibility is to use a shared hash
table to remember start times but I think it may depend on what we
decide to solve the previous problem reported by Hou-San.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-12 Thread Amit Kapila
On Fri, Aug 11, 2023 at 7:15 PM Melih Mutlu  wrote:
>
> Peter Smith , 11 Ağu 2023 Cum, 01:26 tarihinde şunu 
> yazdı:
>>
>> No, I meant what I wrote there. When I ran the tests the HEAD included
>> the v25-0001 refactoring patch, but v26 did not yet exist.
>>
>> For now, we are only performance testing the first
>> "Reuse-Tablesyc-Workers" patch, but not yet including the second patch
>> ("Reuse connection when...").
>>
>> Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001
>> are equivalent because there are only cosmetic log message differences
>> between them.
>
>
> Ok, that's fair.
>
>
>>
>> So, my testing was with HEAD+v24-0002 (but not including v24-0003).
>> Your same testing should be with HEAD+v26-0001 (but not including v26-0002).
>
>
> That's actually what I did. I should have been more clear about what I 
> included in my previous email.With v26-0002, results are noticeably better 
> anyway.
> I just rerun the test again against HEAD, HEAD+v26-0001 and additionally 
> HEAD+v26-0001+v26-0002 this time, for better comparison.
>
> Here are my results with the same scripts you shared earlier (I obviously 
> only changed the number of inserts before each commit. ).
> Note that this is when synchronous_commit = off.
>
> 100 inserts/tx
> +-+---+--+--+--+
> | | 2w| 4w   | 8w   | 16w  |
> +-+---+--+--+--+
> | v26-0002| 10421 | 6472 | 6656 | 6566 |
> +-+---+--+--+--+
> | improvement | 31%   | 12%  | 0%   | 5%   |
> +-+---+--+--+--+
> | v26-0001| 14585 | 7386 | 7129 | 7274 |
> +-+---+--+--+--+
> | improvement | 9%| 5%   | 12%  | 7%   |
> +-+---+--+--+--+
> | HEAD| 16130 | 7785 | 8147 | 7827 |
> +-+---+--+--+--+
>
> 1000 inserts/tx
> +-+---+--+--+--+
> | | 2w| 4w   | 8w   | 16w  |
> +-+---+--+--+--+
> | v26-0002| 13796 | 6848 | 5942 | 6315 |
> +-+---+--+--+--+
> | improvement | 9%| 7%   | 10%  | 8%   |
> +-+---+--+--+--+
> | v26-0001| 14685 | 7325 | 6675 | 6719 |
> +-+---+--+--+--+
> | improvement | 3%| 0%   | 0%   | 2%   |
> +-+---+--+--+--+
> | HEAD| 15118 | 7354 | 6644 | 6890 |
> +-+---+--+--+--+
>
> 2000 inserts/tx
> +-+---+---+--+--+
> | | 2w| 4w| 8w   | 16w  |
> +-+---+---+--+--+
> | v26-0002| 22442 | 9944  | 6034 | 5829 |
> +-+---+---+--+--+
> | improvement | 5%| 2%| 4%   | 10%  |
> +-+---+---+--+--+
> | v26-0001| 23632 | 10164 | 6311 | 6480 |
> +-+---+---+--+--+
> | improvement | 0%| 0%| 0%   | 0%   |
> +-+---+---+--+--+
> | HEAD| 23667 | 10157 | 6285 | 6470 |
> +-+---+---+--+--+
>
> 5000 inserts/tx
> +-+---+---+---+--+
> | | 2w| 4w| 8w| 16w  |
> +-+---+---+---+--+
> | v26-0002| 41443 | 21385 | 10832 | 6146 |
> +-+---+---+---+--+
> | improvement | 0%| 0%| 1%| 16%  |
> +-+---+---+---+--+
> | v26-0001| 41293 | 21226 | 10814 | 6158 |
> +-+---+---+---+--+
> | improvement | 0%| 1%| 1%| 15%  |
> +-+---+---+---+--+
> | HEAD| 41503 | 21466 | 10943 | 7292 |
> +-+---+---+---+--+
>
>
> Again, I couldn't reproduce the cases where you saw significantly degraded 
> performance.
>

I am not surprised to see that you don't see regression because as per
Vignesh's analysis, this is purely a timing issue where sometimes
after the patch the slot creation can take more time because there is
a constant inflow of transactions on the publisher. I think we are
seeing it because this workload is predominantly just creating and
destroying slots. We can probably improve it later as discussed
earlier by using a single for multiple copies (especially for small
tables) or something like that.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-11 Thread Melih Mutlu
Hi Peter,

Peter Smith , 11 Ağu 2023 Cum, 01:26 tarihinde şunu
yazdı:

> No, I meant what I wrote there. When I ran the tests the HEAD included
> the v25-0001 refactoring patch, but v26 did not yet exist.
>
> For now, we are only performance testing the first
> "Reuse-Tablesyc-Workers" patch, but not yet including the second patch
> ("Reuse connection when...").
>
> Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001
> are equivalent because there are only cosmetic log message differences
> between them.
>

Ok, that's fair.



> So, my testing was with HEAD+v24-0002 (but not including v24-0003).
> Your same testing should be with HEAD+v26-0001 (but not including
> v26-0002).
>

That's actually what I did. I should have been more clear about what I
included in my previous email.With v26-0002, results are noticeably better
anyway.
I just rerun the test again against HEAD, HEAD+v26-0001 and additionally
HEAD+v26-0001+v26-0002 this time, for better comparison.

Here are my results with the same scripts you shared earlier (I obviously
only changed the number of inserts before each commit. ).
Note that this is when synchronous_commit = off.

100 inserts/tx
+-+---+--+--+--+
| | 2w| 4w   | 8w   | 16w  |
+-+---+--+--+--+
| v26-0002| 10421 | 6472 | 6656 | 6566 |
+-+---+--+--+--+
| improvement | 31%   | 12%  | 0%   | 5%   |
+-+---+--+--+--+
| v26-0001| 14585 | 7386 | 7129 | 7274 |
+-+---+--+--+--+
| improvement | 9%| 5%   | 12%  | 7%   |
+-+---+--+--+--+
| HEAD| 16130 | 7785 | 8147 | 7827 |
+-+---+--+--+--+

1000 inserts/tx
+-+---+--+--+--+
| | 2w| 4w   | 8w   | 16w  |
+-+---+--+--+--+
| v26-0002| 13796 | 6848 | 5942 | 6315 |
+-+---+--+--+--+
| improvement | 9%| 7%   | 10%  | 8%   |
+-+---+--+--+--+
| v26-0001| 14685 | 7325 | 6675 | 6719 |
+-+---+--+--+--+
| improvement | 3%| 0%   | 0%   | 2%   |
+-+---+--+--+--+
| HEAD| 15118 | 7354 | 6644 | 6890 |
+-+---+--+--+--+

2000 inserts/tx
+-+---+---+--+--+
| | 2w| 4w| 8w   | 16w  |
+-+---+---+--+--+
| v26-0002| 22442 | 9944  | 6034 | 5829 |
+-+---+---+--+--+
| improvement | 5%| 2%| 4%   | 10%  |
+-+---+---+--+--+
| v26-0001| 23632 | 10164 | 6311 | 6480 |
+-+---+---+--+--+
| improvement | 0%| 0%| 0%   | 0%   |
+-+---+---+--+--+
| HEAD| 23667 | 10157 | 6285 | 6470 |
+-+---+---+--+--+

5000 inserts/tx
+-+---+---+---+--+
| | 2w| 4w| 8w| 16w  |
+-+---+---+---+--+
| v26-0002| 41443 | 21385 | 10832 | 6146 |
+-+---+---+---+--+
| improvement | 0%| 0%| 1%| 16%  |
+-+---+---+---+--+
| v26-0001| 41293 | 21226 | 10814 | 6158 |
+-+---+---+---+--+
| improvement | 0%| 1%| 1%| 15%  |
+-+---+---+---+--+
| HEAD| 41503 | 21466 | 10943 | 7292 |
+-+---+---+---+--+


Again, I couldn't reproduce the cases where you saw significantly degraded
performance. I wonder if I'm missing something. Did you do anything not
included in the test scripts you shared? Do you think v26-0001 will
perform 84% worse than HEAD, if you try again? I just want to be sure that
it was not a random thing.
Interestingly, I also don't see an improvement in above results as big as
in your results when inserts/tx ratio is smaller. Even though it certainly
is improved in such cases.

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-11 Thread vignesh C
On Fri, 11 Aug 2023 at 16:26, vignesh C  wrote:
>
> On Wed, 9 Aug 2023 at 09:51, vignesh C  wrote:
> >
> > Hi Melih,
> >
> > Here is a patch to help in getting the execution at various phases
> > like: a) replication slot creation time, b) Wal reading c) Number of
> > WAL records read d) subscription relation state change etc
> > Couple of observation while we tested with this patch:
> > 1) We noticed that the patch takes more time for finding the decoding
> > start point.
> > 2) Another observation was that the number of XLOG records read for
> > identify the consistent point was significantly high with the v26_0001
> > patch.
> >
> > HEAD
> > postgres=# select avg(counttime)/1000 "avgtime(ms)",
> > median(counttime)/1000 "median(ms)", min(counttime)/1000
> > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
> > group by logtype;
> >   avgtime(ms)   |   median(ms)   | mintime(ms) |
> > maxtime(ms) | logtype
> > ++-+-+--
> >  0.00579245283018867920 | 0.0020 |   0 |
> > 1 | SNAPSHOT_BUILD
> >  1.2246811320754717 | 0.9855 |   0 |
> >37 | LOGICAL_SLOT_CREATION
> >171.0863283018867920 |   183.9120 |   0 |
> >   408 | FIND_DECODING_STARTPOINT
> >  2.0699433962264151 | 1.4380 |   1 |
> >49 | INIT_DECODING_CONTEXT
> > (4 rows)
> >
> > HEAD + v26-0001 patch
> > postgres=# select avg(counttime)/1000 "avgtime(ms)",
> > median(counttime)/1000 "median(ms)", min(counttime)/1000
> > "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
> > group by logtype;
> >   avgtime(ms)   |   median(ms)   | mintime(ms) |
> > maxtime(ms) | logtype
> > ++-+-+--
> >  0.00588113207547169810 | 0.0050 |   0 |
> > 0 | SNAPSHOT_BUILD
> >  1.1270962264150943 | 1.1000 |   0 |
> > 2 | LOGICAL_SLOT_CREATION
> >301.1745528301886790 |   410.4870 |   0 |
> >   427 | FIND_DECODING_STARTPOINT
> >  1.4814660377358491 | 1.4530 |   1 |
> > 9 | INIT_DECODING_CONTEXT
> > (4 rows)
> >
> > In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 
> > patch.
> >
> > HEAD
> > FIND_DECODING_XLOG_RECORD_COUNT
> > - average =  2762
> > - median = 3362
> >
> > HEAD + reuse worker patch(v26_0001 patch)
> > Where FIND_DECODING_XLOG_RECORD_COUNT
> > - average =  4105
> > - median = 5345
> >
> > Similarly Number of xlog records read is higher with v26_0001 patch.
> >
> > Steps to calculate the timing:
> > -- first collect the necessary LOG from subscriber's log.
> > cat *.log | grep -E
> > '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)'
> > > grep.dat
> >
> > create table testv26(logtime varchar, pid varchar, level varchar,
> > space varchar, logtype varchar, counttime int);
> > -- then copy these datas into db table to count the avg number.
> > COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' ';
> >
> > -- Finally, use the SQL to analyze the data:
> > select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by 
> > logtype;
> >
> > --- To get the number of xlog records read:
> > select avg(counttime) from testv26 where logtype
> > ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1;
> >
> > Thanks to Peter and Hou-san who helped in finding these out. We are
> > parallely analysing this, @Melih Mutlu  posting this information so
> > that it might help you too in analysing this issue.
>
> I analysed further on why it needs to read a larger number of XLOG
> records in some cases while creating the replication slot, here are my
> thoughts:
> Note: Tablesync worker needs to connect to the publisher and create
> consistent point for the slots by reading the XLOG records. This
> requires that all the open transactions and the transactions that are
> created while creating consistent point should be committed.
> I feel the creation of slots is better in few cases in Head because:
> Publisher| Subscriber
> 
> Begin txn1 transaction|
> Insert 1..1000 records|
> Commit   |
> Begin txn2 transaction|
> Insert 1..1000 records |  Apply worker applies transaction txn1
> |  Start tablesync table t2
> |  create consistent point in
> | publisher before transaction txn3 is
> | started
> commit|  We just need 

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-11 Thread vignesh C
On Wed, 9 Aug 2023 at 09:51, vignesh C  wrote:
>
> Hi Melih,
>
> Here is a patch to help in getting the execution at various phases
> like: a) replication slot creation time, b) Wal reading c) Number of
> WAL records read d) subscription relation state change etc
> Couple of observation while we tested with this patch:
> 1) We noticed that the patch takes more time for finding the decoding
> start point.
> 2) Another observation was that the number of XLOG records read for
> identify the consistent point was significantly high with the v26_0001
> patch.
>
> HEAD
> postgres=# select avg(counttime)/1000 "avgtime(ms)",
> median(counttime)/1000 "median(ms)", min(counttime)/1000
> "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
> group by logtype;
>   avgtime(ms)   |   median(ms)   | mintime(ms) |
> maxtime(ms) | logtype
> ++-+-+--
>  0.00579245283018867920 | 0.0020 |   0 |
> 1 | SNAPSHOT_BUILD
>  1.2246811320754717 | 0.9855 |   0 |
>37 | LOGICAL_SLOT_CREATION
>171.0863283018867920 |   183.9120 |   0 |
>   408 | FIND_DECODING_STARTPOINT
>  2.0699433962264151 | 1.4380 |   1 |
>49 | INIT_DECODING_CONTEXT
> (4 rows)
>
> HEAD + v26-0001 patch
> postgres=# select avg(counttime)/1000 "avgtime(ms)",
> median(counttime)/1000 "median(ms)", min(counttime)/1000
> "mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
> group by logtype;
>   avgtime(ms)   |   median(ms)   | mintime(ms) |
> maxtime(ms) | logtype
> ++-+-+--
>  0.00588113207547169810 | 0.0050 |   0 |
> 0 | SNAPSHOT_BUILD
>  1.1270962264150943 | 1.1000 |   0 |
> 2 | LOGICAL_SLOT_CREATION
>301.1745528301886790 |   410.4870 |   0 |
>   427 | FIND_DECODING_STARTPOINT
>  1.4814660377358491 | 1.4530 |   1 |
> 9 | INIT_DECODING_CONTEXT
> (4 rows)
>
> In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch.
>
> HEAD
> FIND_DECODING_XLOG_RECORD_COUNT
> - average =  2762
> - median = 3362
>
> HEAD + reuse worker patch(v26_0001 patch)
> Where FIND_DECODING_XLOG_RECORD_COUNT
> - average =  4105
> - median = 5345
>
> Similarly Number of xlog records read is higher with v26_0001 patch.
>
> Steps to calculate the timing:
> -- first collect the necessary LOG from subscriber's log.
> cat *.log | grep -E
> '(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)'
> > grep.dat
>
> create table testv26(logtime varchar, pid varchar, level varchar,
> space varchar, logtype varchar, counttime int);
> -- then copy these datas into db table to count the avg number.
> COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' ';
>
> -- Finally, use the SQL to analyze the data:
> select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by 
> logtype;
>
> --- To get the number of xlog records read:
> select avg(counttime) from testv26 where logtype
> ='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1;
>
> Thanks to Peter and Hou-san who helped in finding these out. We are
> parallely analysing this, @Melih Mutlu  posting this information so
> that it might help you too in analysing this issue.

I analysed further on why it needs to read a larger number of XLOG
records in some cases while creating the replication slot, here are my
thoughts:
Note: Tablesync worker needs to connect to the publisher and create
consistent point for the slots by reading the XLOG records. This
requires that all the open transactions and the transactions that are
created while creating consistent point should be committed.
I feel the creation of slots is better in few cases in Head because:
Publisher| Subscriber

Begin txn1 transaction|
Insert 1..1000 records|
Commit   |
Begin txn2 transaction|
Insert 1..1000 records |  Apply worker applies transaction txn1
|  Start tablesync table t2
|  create consistent point in
| publisher before transaction txn3 is
| started
commit|  We just need to wait till
| transaction txn2 is finished.
Begin txn3 transaction|
Insert 1..1000 records |
commit|

In V26, this is happening in some cases:
Publisher| Subscriber

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-10 Thread Peter Smith
On Fri, Aug 11, 2023 at 12:54 AM Melih Mutlu  wrote:
>
> Hi Peter and Vignesh,
>
> Peter Smith , 7 Ağu 2023 Pzt, 09:25 tarihinde şunu 
> yazdı:
>>
>> Hi Melih.
>>
>> Now that the design#1 ERRORs have been fixed, we returned to doing
>> performance measuring of the design#1 patch versus HEAD.
>
>
> Thanks a lot for taking the time to benchmark the patch. It's really helpful.
>
>> Publisher "busy" table does commit every 1000 inserts:
>> 2w 4w 8w 16w
>> HEAD 11898 5855 1868 1631
>> HEAD+v24-0002 21905 8254 3531 1626
>> %improvement -84% -41% -89% 0%
>>
>>
>> ^ Note - design#1 was slower than HEAD here
>>
>>
>> ~
>>
>>
>> Publisher "busy" table does commit every 2000 inserts:
>> 2w 4w 8w 16w
>> HEAD 21740 7109 3454 1703
>> HEAD+v24-0002 21585 10877 4779 2293
>> %improvement 1% -53% -38% -35%
>
>
> I assume you meant HEAD+v26-0002 and not v24. I wanted to quickly reproduce 
> these two cases where the patch was significantly worse. Interestingly my 
> results are a bit different than yours.
>

No, I meant what I wrote there. When I ran the tests the HEAD included
the v25-0001 refactoring patch, but v26 did not yet exist.

For now, we are only performance testing the first
"Reuse-Tablesyc-Workers" patch, but not yet including the second patch
("Reuse connection when...").

Note that those "Reuse-Tablesyc-Workers" patches v24-0002 and v26-0001
are equivalent because there are only cosmetic log message differences
between them.
So, my testing was with HEAD+v24-0002 (but not including v24-0003).
Your same testing should be with HEAD+v26-0001 (but not including v26-0002).

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-10 Thread Melih Mutlu
Hi Peter and Vignesh,

Peter Smith , 7 Ağu 2023 Pzt, 09:25 tarihinde şunu
yazdı:

> Hi Melih.
>
> Now that the design#1 ERRORs have been fixed, we returned to doing
> performance measuring of the design#1 patch versus HEAD.


Thanks a lot for taking the time to benchmark the patch. It's really
helpful.

Publisher "busy" table does commit every 1000 inserts:
> 2w 4w 8w 16w
> HEAD 11898 5855 1868 1631
> HEAD+v24-0002 21905 8254 3531 1626
> %improvement -84% -41% -89% 0%


> ^ Note - design#1 was slower than HEAD here


> ~


> Publisher "busy" table does commit every 2000 inserts:
> 2w 4w 8w 16w
> HEAD 21740 7109 3454 1703
> HEAD+v24-0002 21585 10877 4779 2293
> %improvement 1% -53% -38% -35%


I assume you meant HEAD+v26-0002 and not v24. I wanted to quickly reproduce
these two cases where the patch was significantly worse. Interestingly my
results are a bit different than yours.

Publisher "busy" table does commit every 1000 inserts:
2w 4w 8w 16w
HEAD 22405 10335 5008 3304
HEAD+v26  19954  8037 4068 2761
%improvement 1% 2% 2% 1%

Publisher "busy" table does commit every 2000 inserts:
2w 4w 8w 16w
HEAD 33122 14220 7251 4279
HEAD+v26 34248 16213 7356 3914
%improvement 0% -1% 0% 1%

If I'm not doing something wrong in testing (or maybe the patch doesn't
perform reliable yet for some reason), I don't see a drastic change in
performance. But I guess the patch is supposed to perform better than HEAD
in these both cases anyway. right?. I would expect the performance of the
patch to converge to HEAD's performance with large tables. But I'm not sure
what to expect when apply worker is busy with large transactions.

However, I need to investigate a bit more what Vignesh shared earlier [1].
It makes sense that those issues can cause this problem here.

It just takes a bit of time for me to figure out these things, but I'm
working on it.

[1]
https://www.postgresql.org/message-id/CALDaNm1TA068E2niJFUR9ig%2BYz3-ank%3Dj5%3Dj-2UocbzaDnQPrA%40mail.gmail.com



Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-09 Thread Peter Smith
Hi Melih,

FYI -- The same testing was repeated but this time PG was configured
to say synchronous_commit=on. Other factors and scripts were the same
as before --- busy apply, 5 runs, 4 workers, 1000 inserts/tx, 100
empty tables, etc.

There are still more xlog records seen for the v26 patch, but now the
v26 performance was better than HEAD.

RESULTS (synchronous_commit=on)
---

Xlog Counts

HEAD
postgres=# select avg(counttime) "avg", median(counttime) "median",
min(counttime) "min", max(counttime) "max", logtype from test_head
group by logtype;
  avg  |median | min | max  |
   logtype
---+---+-+--+---
---+---+-+--+---
---+---+-+--+---
1253.7509433962264151 | 1393. |   1 | 2012 |
FIND_DECODING_XLOG_RECORD_COUNT
(1 row)


HEAD+v26-0001
postgres=# select avg(counttime) "avg", median(counttime) "median",
min(counttime) "min", max(counttime) "max", logtype from test_v26
group by logtype;
  avg  |median | min | max  |
   logtype
---+---+-+--+---
---+---+-+--+---
---+---+-+--+---
1278.4075471698113208 | 1423.5000 |   1 | 2015 |
FIND_DECODING_XLOG_RECORD_COUNT
(1 row)

~~

Performance

HEAD
[peter@localhost res_0809_vignesh_timing_sync_head]$ cat *.dat_SUB |
grep RESULT | grep -v duration | awk '{print $3}'
4014.266
3892.089
4195.318
3571.862
4312.183


HEAD+v26-0001
[peter@localhost res_0809_vignesh_timing_sync_v260001]$ cat *.dat_SUB
| grep RESULT | grep -v duration | awk '{print $3}'
3326.627
3213.028
3433.611
3299.803
3258.821

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-09 Thread Amit Kapila
On Wed, Aug 9, 2023 at 8:28 AM Zhijie Hou (Fujitsu)
 wrote:
>
> On Thursday, August 3, 2023 7:30 PM Melih Mutlu   
> wrote:
>
> > Right. I attached the v26 as you asked.
>
> Thanks for posting the patches.
>
> While reviewing the patch, I noticed one rare case that it's possible that 
> there
> are two table sync worker for the same table in the same time.
>
> The patch relies on LogicalRepWorkerLock to prevent concurrent access, but the
> apply worker will start a new worker after releasing the lock. So, at the 
> point[1]
> where the lock is released and the new table sync worker has not been started,
> it seems possible that another old table sync worker will be reused for the
> same table.
>
> /* Now safe to release the LWLock */
> LWLockRelease(LogicalRepWorkerLock);
> *[1]
> /*
>  * If there are free sync worker slot(s), 
> start a new sync
>  * worker for the table.
>  */
> if (nsyncworkers < 
> max_sync_workers_per_subscription)
> ...
> 
> logicalrep_worker_launch(MyLogicalRepWorker->dbid,
>

Yeah, this is a problem. I think one idea to solve this is by
extending the lock duration till we launch the tablesync worker but we
should also consider changing this locking scheme such that there is a
better way to indicate that for a particular rel, tablesync is in
progress. Currently, the code in TablesyncWorkerMain() also acquires
the lock in exclusive mode even though the tablesync for a rel is in
progress which I guess could easily heart us for larger values of
max_logical_replication_workers. So, that could be another motivation
to think for a different locking scheme.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-08 Thread vignesh C
Hi Melih,

Here is a patch to help in getting the execution at various phases
like: a) replication slot creation time, b) Wal reading c) Number of
WAL records read d) subscription relation state change etc
Couple of observation while we tested with this patch:
1) We noticed that the patch takes more time for finding the decoding
start point.
2) Another observation was that the number of XLOG records read for
identify the consistent point was significantly high with the v26_0001
patch.

HEAD
postgres=# select avg(counttime)/1000 "avgtime(ms)",
median(counttime)/1000 "median(ms)", min(counttime)/1000
"mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
group by logtype;
  avgtime(ms)   |   median(ms)   | mintime(ms) |
maxtime(ms) | logtype
++-+-+--
 0.00579245283018867920 | 0.0020 |   0 |
1 | SNAPSHOT_BUILD
 1.2246811320754717 | 0.9855 |   0 |
   37 | LOGICAL_SLOT_CREATION
   171.0863283018867920 |   183.9120 |   0 |
  408 | FIND_DECODING_STARTPOINT
 2.0699433962264151 | 1.4380 |   1 |
   49 | INIT_DECODING_CONTEXT
(4 rows)

HEAD + v26-0001 patch
postgres=# select avg(counttime)/1000 "avgtime(ms)",
median(counttime)/1000 "median(ms)", min(counttime)/1000
"mintime(ms)", max(counttime)/1000 "maxtime(ms)", logtype from test
group by logtype;
  avgtime(ms)   |   median(ms)   | mintime(ms) |
maxtime(ms) | logtype
++-+-+--
 0.00588113207547169810 | 0.0050 |   0 |
0 | SNAPSHOT_BUILD
 1.1270962264150943 | 1.1000 |   0 |
2 | LOGICAL_SLOT_CREATION
   301.1745528301886790 |   410.4870 |   0 |
  427 | FIND_DECODING_STARTPOINT
 1.4814660377358491 | 1.4530 |   1 |
9 | INIT_DECODING_CONTEXT
(4 rows)

In the above FIND_DECODING_STARTPOINT is very much higher with V26-0001 patch.

HEAD
FIND_DECODING_XLOG_RECORD_COUNT
- average =  2762
- median = 3362

HEAD + reuse worker patch(v26_0001 patch)
Where FIND_DECODING_XLOG_RECORD_COUNT
- average =  4105
- median = 5345

Similarly Number of xlog records read is higher with v26_0001 patch.

Steps to calculate the timing:
-- first collect the necessary LOG from subscriber's log.
cat *.log | grep -E
'(LOGICAL_SLOT_CREATION|INIT_DECODING_CONTEXT|FIND_DECODING_STARTPOINT|SNAPSHOT_BUILD|FIND_DECODING_XLOG_RECORD_COUNT|LOGICAL_XLOG_READ|LOGICAL_DECODE_PROCESS_RECORD|LOGICAL_WAIT_TRANSACTION)'
> grep.dat

create table testv26(logtime varchar, pid varchar, level varchar,
space varchar, logtype varchar, counttime int);
-- then copy these datas into db table to count the avg number.
COPY testv26 FROM '/home/logs/grep.dat' DELIMITER ' ';

-- Finally, use the SQL to analyze the data:
select avg(counttime)/1000 "avgtime(ms)", logtype from testv26 group by logtype;

--- To get the number of xlog records read:
select avg(counttime) from testv26 where logtype
='FIND_DECODING_XLOG_RECORD_COUNT' and counttime != 1;

Thanks to Peter and Hou-san who helped in finding these out. We are
parallely analysing this, @Melih Mutlu  posting this information so
that it might help you too in analysing this issue.

Regards,
Vignesh
From b755cab38ff76e9f63304b2d8f344cb098ca6a33 Mon Sep 17 00:00:00 2001
From: Hou Zhijie 
Date: Fri, 4 Aug 2023 17:57:29 +0800
Subject: [PATCH v1 1/2] count state change time

---
 src/backend/replication/logical/tablesync.c | 28 +
 1 file changed, 28 insertions(+)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a775065..0d9298f7b3 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -123,6 +123,10 @@
 #include "utils/syscache.h"
 #include "utils/usercontext.h"
 
+static TimestampTz start = 0;
+static long		secs = 0;
+static int			microsecs = 0;
+
 static bool table_states_valid = false;
 static List *table_states_not_ready = NIL;
 static bool FetchTableStates(bool *started_tx);
@@ -338,6 +342,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
 		CommitTransactionCommand();
+
+		TimestampDifference(start, GetCurrentTimestamp(), , );
+		elog(LOG, "SUBREL_STATE_SYNCDONE %d", ((int) secs * 100 + microsecs));
+		start = GetCurrentTimestamp();
+
 		pgstat_report_stat(false);
 
 		/*
@@ -1258,6 +1267,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	bool		must_use_password;
 	bool		run_as_owner;
 
+	start = GetCurrentTimestamp();
+
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
@@ -1361,6 

RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-08 Thread Zhijie Hou (Fujitsu)
On Thursday, August 3, 2023 7:30 PM Melih Mutlu   wrote:

> Right. I attached the v26 as you asked. 

Thanks for posting the patches.
 
While reviewing the patch, I noticed one rare case that it's possible that there
are two table sync worker for the same table in the same time.

The patch relies on LogicalRepWorkerLock to prevent concurrent access, but the
apply worker will start a new worker after releasing the lock. So, at the 
point[1]
where the lock is released and the new table sync worker has not been started,
it seems possible that another old table sync worker will be reused for the
same table.

/* Now safe to release the LWLock */
LWLockRelease(LogicalRepWorkerLock);
*[1]
/*
 * If there are free sync worker slot(s), start 
a new sync
 * worker for the table.
 */
if (nsyncworkers < 
max_sync_workers_per_subscription)
...

logicalrep_worker_launch(MyLogicalRepWorker->dbid,

I can reproduce it by using gdb.

Steps:
1. set max_sync_workers_per_subscription to 1 and setup pub/sub which publishes
   two tables(table A and B).
2. when the table sync worker for the table A started, use gdb to block it
   before being reused for another table.
3. set max_sync_workers_per_subscription to 2 and use gdb to block the apply
   worker at the point after releasing the LogicalRepWorkerLock and before
   starting another table sync worker for table B.
4. release the blocked table sync worker, then we can see the table sync worker
   is also reused for table B.
5. release the apply worker, then we can see the apply worker will start
   another table sync worker for the same table(B).

I think it would be better to prevent this case from happening as this case
will give some unexpected ERROR or LOG. Note that I haven't checked if it would
cause worse problems like duplicate copy or others.

Best Regards,
Hou zj


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-07 Thread Peter Smith
Hi Melih.

Now that the design#1 ERRORs have been fixed, we returned to doing
performance measuring of the design#1 patch versus HEAD.

Unfortunately, we observed that under some particular conditions
(large transactions of 1000 inserts/tx for a busy apply worker, 100
empty tables to be synced) the performance was worse with the design#1
patch applied.

~~

RESULTS

Below are some recent measurements (for 100 empty tables to be synced
when apply worker is already busy). We vary the size of the published
transaction for the "busy" table, and you can see that for certain
large transaction sizes (1000 and 2000 inserts/tx) the design#1
performance was worse than HEAD:

~

The publisher "busy" table does commit every 10 inserts:
2w 4w 8w 16w
HEAD 3945 1138 1166 1205
HEAD+v24-0002 3559 886 355 490
%improvement 10% 22% 70% 59%

~

The publisher "busy" table does commit every 100 inserts:
2w 4w 8w 16w
HEAD 2363 1357 1354 1355
HEAD+v24-0002 2077 1358 762 756
%improvement 12% 0% 44% 44%

~

Publisher "busy" table does commit every 1000 inserts:
2w 4w 8w 16w
HEAD 11898 5855 1868 1631
HEAD+v24-0002 21905 8254 3531 1626
%improvement -84% -41% -89% 0%

^ Note - design#1 was slower than HEAD here

~

Publisher "busy" table does commit every 2000 inserts:
2w 4w 8w 16w
HEAD 21740 7109 3454 1703
HEAD+v24-0002 21585 10877 4779 2293
%improvement 1% -53% -38% -35%

^ Note - design#1 was slower than HEAD here

~

The publisher "busy" table does commit every 5000 inserts:
2w 4w 8w 16w
HEAD 36094 18105 8595 3567
HEAD+v24-0002 36305 18199 8151 3710
%improvement -1% -1% 5% -4%

~

The publisher "busy" table does commit every 1 inserts:
2w 4w 8w 16w
HEAD 38077 18406 9426 5559
HEAD+v24-0002 36763 18027 8896 4166
%improvement 3% 2% 6% 25%

--

TEST SCRIPTS

The "busy apply" test scripts are basically the same as already posted
[1], but I have reattached the latest ones again anyway.

--
[1] 
https://www.postgresql.org/message-id/CAHut%2BPuNVNK2%2BA%2BR6eV8rKPNBHemCFE4NDtEYfpXbYr6SsvvBg%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia
#!/bin/bash
#
# SUB
#
# First argument : number of tables
# Second argument : size[Byte] of each tables
# Third argument : max_sync_workers
# Fourth argument : execution numbers
#

port_pub=5431
data_pub=datapub

port_sub=5432
data_sub=datasub

echo ''
echo '# Check configurations #'
echo ''

declare num_tables
if [ -n "$1" ]; then
num_tables=$1
else
num_tables=10
fi

echo "$num_tables tables will be used while testing"

declare table_size
if [ -n "$2" ]; then
table_size=$2
else
table_size=0
fi

num_sync_workers=$3
run_no=$4

#
# Convert from table_size to number of tuples. The equation was
# found by my tests...
#

declare num_tuples
if [ $table_size == "10kB" ]
then
num_tuples=3250
else
num_tuples=0
fi

echo "$num_tuples tuples will be inserted to each tables"


echo '##'
echo '# IPC at subscriber-side #'
echo '##'

psql -U postgres -p $port_sub -a -c "CREATE SUBSCRIPTION ipc_from_publisher CONNECTION 'host=localhost user=postgres port=$port_pub' PUBLICATION ipc_at_publisher WITH(origin=NONE);"
psql -U postgres -p $port_sub -a -c "CREATE PUBLICATION ipc_at_subscriber FOR TABLE ipc;"

# wait a bit for the publisher-side to connect to this publication
sleep 5s

psql -U postgres -p $port_sub -a -c "INSERT INTO ipc VALUES('sub ipc ready');"
psql -U postgres -p $port_sub -a -c "CALL ipc_wait_for('pub ipc ready');"

echo '#'
echo '# Create tables #'
echo '#'
(
echo "CREATE TABLE busy_tbl(a text);"

echo "CREATE SCHEMA test_tables;"
echo -e "SELECT 'CREATE TABLE test_tables.manytables_'||i||'(i int);' FROM generate_series(1, $num_tables) g(i) \gexec"

) | psql -U postgres -p $port_sub -a

echo '##'
echo '# Create subscription for busy table #'
echo '##'
(
echo "CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost user=postgres port=$port_pub' PUBLICATION mypub;"
echo "INSERT INTO ipc VALUES ('mysub is created');"

) | psql -U postgres -p $port_sub -a


echo ''
echo '# Test #'
echo ''
(
echo -e "CREATE OR REPLACE PROCEDURE log_rep_test(max INTEGER) AS \$\$
DECLARE
total_duration INTERVAL := '0';
avg_duration FLOAT := 0.0;
start_time TIMESTAMP;
end_time TIMESTAMP;
BEGIN
start_time := clock_timestamp();

-- time how long it takes for all the tablesyncs to become "ready"
WHILE EXISTS (SELECT 1 FROM pg_subscription_rel WHERE srsubstate != 'r') LOOP
COMMIT;
END LOOP;

end_time := clock_timestamp();

total_duration := total_duration + (end_time - start_time);

IF max > 0 THEN
avg_duration := EXTRACT(EPOCH FROM total_duration) / max * 1000;
END IF;

RAISE NOTICE 'RESULT: %', avg_duration;
   

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-03 Thread Peter Smith
FWIW, I confirmed that my review comments for v22* have all been
addressed in the latest v26* patches.

Thanks!

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-03 Thread Melih Mutlu
Hi,

Amit Kapila , 3 Ağu 2023 Per, 09:22 tarihinde şunu
yazdı:

> On Thu, Aug 3, 2023 at 9:35 AM Peter Smith  wrote:
> > I checked the latest patch v25-0001.
> >
> > LGTM.
> >
>
> Thanks, I have pushed 0001. Let's focus on the remaining patches.
>

Thanks!

Peter Smith , 3 Ağu 2023 Per, 12:06 tarihinde şunu
yazdı:

> Just to clarify my previous post, I meant we will need new v26* patches
>

Right. I attached the v26 as you asked.

Thanks,
-- 
Melih Mutlu
Microsoft


v26-0001-Reuse-Tablesync-Workers.patch
Description: Binary data


v26-0002-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-03 Thread Peter Smith
Just to clarify my previous post, I meant we will need new v26* patches

v24-0001 -> not needed because v25-0001 pushed
v24-0002 -> v26-0001
v24-0003 -> v26-0002

On Thu, Aug 3, 2023 at 6:19 PM Peter Smith  wrote:
>
> Hi Melih,
>
> Now that v25-0001 has been pushed, can you please rebase the remaining 
> patches?
>
> --
> Kind Regards,
> Peter Smith.
> Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-03 Thread Peter Smith
Hi Melih,

Now that v25-0001 has been pushed, can you please rebase the remaining patches?

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-03 Thread Amit Kapila
On Thu, Aug 3, 2023 at 9:35 AM Peter Smith  wrote:
>
> On Wed, Aug 2, 2023 at 11:19 PM Amit Kapila  wrote:
> >
> > On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu  wrote:
> > >
> > > PFA an updated version with some of the earlier reviews addressed.
> > > Forgot to include them in the previous email.
> > >
> >
> > It is always better to explicitly tell which reviews are addressed but
> > anyway, I have done some minor cleanup in the 0001 patch including
> > removing includes which didn't seem necessary, modified a few
> > comments, and ran pgindent. I also thought of modifying some variable
> > names based on suggestions by Peter Smith in an email [1] but didn't
> > find many of them any better than the current ones so modified just a
> > few of those. If you guys are okay with this then let's commit it and
> > then we can focus more on the remaining patches.
> >
>
> I checked the latest patch v25-0001.
>
> LGTM.
>

Thanks, I have pushed 0001. Let's focus on the remaining patches.


-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-02 Thread Peter Smith
On Wed, Aug 2, 2023 at 11:19 PM Amit Kapila  wrote:
>
> On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu  wrote:
> >
> > PFA an updated version with some of the earlier reviews addressed.
> > Forgot to include them in the previous email.
> >
>
> It is always better to explicitly tell which reviews are addressed but
> anyway, I have done some minor cleanup in the 0001 patch including
> removing includes which didn't seem necessary, modified a few
> comments, and ran pgindent. I also thought of modifying some variable
> names based on suggestions by Peter Smith in an email [1] but didn't
> find many of them any better than the current ones so modified just a
> few of those. If you guys are okay with this then let's commit it and
> then we can focus more on the remaining patches.
>

I checked the latest patch v25-0001.

LGTM.

~~

BTW, I have re-tested many cases of HEAD versus HEAD+v25-0001 (using
current test scripts previously mentioned in this thread). Because
v25-0001 is only a refactoring patch we expect that the results should
be the same as for HEAD, and that is what I observed.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-02 Thread Amit Kapila
On Wed, Aug 2, 2023 at 4:09 PM Melih Mutlu  wrote:
>
> PFA an updated version with some of the earlier reviews addressed.
> Forgot to include them in the previous email.
>

It is always better to explicitly tell which reviews are addressed but
anyway, I have done some minor cleanup in the 0001 patch including
removing includes which didn't seem necessary, modified a few
comments, and ran pgindent. I also thought of modifying some variable
names based on suggestions by Peter Smith in an email [1] but didn't
find many of them any better than the current ones so modified just a
few of those. If you guys are okay with this then let's commit it and
then we can focus more on the remaining patches.

[1] - 
https://www.postgresql.org/message-id/CAHut%2BPs3Du9JFmhecWY8%2BVFD11VLOkSmB36t_xWHHQJNMpdA-A%40mail.gmail.com

-- 
With Regards,
Amit Kapila.


v25-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-02 Thread Melih Mutlu
Hi,

>
PFA an updated version with some of the earlier reviews addressed.
Forgot to include them in the previous email.

Thanks,
-- 
Melih Mutlu
Microsoft


v24-0003-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


v24-0002-Reuse-Tablesync-Workers.patch
Description: Binary data


v24-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-02 Thread Melih Mutlu
Hi,

Amit Kapila , 2 Ağu 2023 Çar, 12:01 tarihinde şunu
yazdı:

> I think we are getting the error (ERROR:  could not find logical
> decoding starting point) because we wouldn't have waited for WAL to
> become available before reading it. It could happen due to the
> following code:
> WalSndWaitForWal()
> {
> ...
> if (streamingDoneReceiving && streamingDoneSending &&
> !pq_is_send_pending())
> break;
> ..
> }
>
> Now, it seems that in 0003 patch, instead of resetting flags
> streamingDoneSending, and streamingDoneReceiving before start
> replication, we should reset before create logical slots because we
> need to read the WAL during that time as well to find the consistent
> point.
>

Thanks for the suggestion Amit. I've been looking into this recently and
couldn't figure out the cause until now.
I quickly made the fix in 0003. Seems like it resolved the "could not find
logical decoding starting point" errors.

vignesh C , 1 Ağu 2023 Sal, 09:32 tarihinde şunu yazdı:

> I agree that  "no copy in progress issue" issue has nothing to do with
> 0001 patch. This issue is present with the 0002 patch.
> In the case when the tablesync worker has to apply the transactions
> after the table is synced, the tablesync worker sends the feedback of
> writepos, applypos and flushpos which results in "No copy in progress"
> error as the stream has ended already. Fixed it by exiting the
> streaming loop if the tablesync worker is done with the
> synchronization. The attached 0004 patch has the changes for the same.
> The rest of v22 patches are the same patch that were posted by Melih
> in the earlier mail.


Thanks for the fix. I placed it into 0002 with a slight change as follows:

- send_feedback(last_received, false, false);
> + if (!MyLogicalRepWorker->relsync_completed)
> + send_feedback(last_received, false, false);


IMHO relsync_completed means simply the same with streaming_done, that's
why I wanted to check that flag instead of an additional goto statement.
Does it make sense to you as well?

Thanks,
-- 
Melih Mutlu
Microsoft


v23-0002-Reuse-Tablesync-Workers.patch
Description: Binary data


v23-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


v23-0003-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-02 Thread Amit Kapila
On Tue, Aug 1, 2023 at 9:44 AM Peter Smith  wrote:
>
>
> FYI, here is some more information about ERRORs seen.
>
> The patches were re-tested -- applied in stages (and also against the
> different scripts) to identify where the problem was introduced. Below
> are the observations:
>
> ~~~
>
> Using original test scripts
>
> 1. Using only patch v21-0001
> - no errors
>
> 2. Using only patch v21-0001+0002
> - no errors
>
> 3. Using patch v21-0001+0002+0003
> - no errors
>
> ~~~
>
> Using the "busy loop" test scripts for long transactions
>
> 1. Using only patch v21-0001
> - no errors
>
> 2. Using only patch v21-0001+0002
> - gives errors for "no copy in progress issue"
> e.g. ERROR:  could not send data to WAL stream: no COPY in progress
>
> 3. Using patch v21-0001+0002+0003
> - gives the same "no copy in progress issue" errors as above
> e.g. ERROR:  could not send data to WAL stream: no COPY in progress
> - and also gives slot consistency point errors
> e.g. ERROR:  could not create replication slot
> "pg_16700_sync_16514_7261998170966054867": ERROR:  could not find
> logical decoding starting point
> e.g. LOG:  could not drop replication slot
> "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR:
> replication slot "pg_16700_sync_16454_7261998170966054867" does not
> exist
>

I think we are getting the error (ERROR:  could not find logical
decoding starting point) because we wouldn't have waited for WAL to
become available before reading it. It could happen due to the
following code:
WalSndWaitForWal()
{
...
if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
break;
..
}

Now, it seems that in 0003 patch, instead of resetting flags
streamingDoneSending, and streamingDoneReceiving before start
replication, we should reset before create logical slots because we
need to read the WAL during that time as well to find the consistent
point.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-08-01 Thread vignesh C
On Tue, 1 Aug 2023 at 09:44, Peter Smith  wrote:
>
> On Fri, Jul 28, 2023 at 5:22 PM Peter Smith  wrote:
> >
> > Hi Melih,
> >
> > BACKGROUND
> > --
> >
> > We wanted to compare performance for the 2 different reuse-worker
> > designs, when the apply worker is already busy handling other
> > replications, and then simultaneously the test table tablesyncs are
> > occurring.
> >
> > To test this scenario, some test scripts were written (described
> > below). For comparisons, the scripts were then run using a build of
> > HEAD; design #1 (v21); design #2 (0718).
> >
> > HOW THE TEST WORKS
> > --
> >
> > Overview:
> > 1. The apply worker is made to subscribe to a 'busy_tbl'.
> > 2. After the SUBSCRIPTION is created, the publisher-side then loops
> > (forever) doing INSERTS into that busy_tbl.
> > 3. While the apply worker is now busy, the subscriber does an ALTER
> > SUBSCRIPTION REFRESH PUBLICATION to subscribe to all the other test
> > tables.
> > 4. We time how long it takes for all tablsyncs to complete
> > 5. Repeat above for different numbers of empty tables (10, 100, 1000,
> > 2000) and different numbers of sync workers (2, 4, 8, 16)
> >
> > Scripts
> > ---
> >
> > (PSA 4 scripts to implement this logic)
> >
> > testrun script
> > - this does common setup (do_one_test_setup) and then the pub/sub
> > scripts (do_one_test_PUB and do_one_test_SUB -- see below) are run in
> > parallel
> > - repeat 10 times
> >
> > do_one_test_setup script
> > - init and start instances
> > - ipc setup tables and procedures
> >
> > do_one_test_PUB script
> > - ipc setup pub/sub
> > - table setup
> > - publishes the "busy_tbl", but then waits for the subscriber to
> > subscribe to only this one
> > - alters the publication to include all other tables (so subscriber
> > will see these only after the ALTER SUBSCRIPTION PUBLICATION REFRESH)
> > - enter a busy INSERT loop until it informed by the subscriber that
> > the test is finished
> >
> > do_one_test_SUB script
> > - ipc setup pub/sub
> > - table setup
> > - subscribes only to "busy_tbl", then informs the publisher when that
> > is done (this will cause the publisher to commence the stay_busy loop)
> > - after it knows the publishing busy loop has started it does
> > - ALTER SUBSCRIPTION REFRESH PUBLICATION
> > - wait until all the tablesyncs are ready <=== This is the part that
> > is timed for the test RESULT
> >
> > PROBLEM
> > ---
> >
> > Looking at the output files (e.g. *.dat_PUB and *.dat_SUB)  they seem
> > to confirm the tests are working how we wanted.
> >
> > Unfortunately, there is some slot problem for the patched builds (both
> > designs #1 and #2). e.g. Search "ERROR" in the *.log files and see
> > many slot-related errors.
> >
> > Please note - running these same scripts with HEAD build gave no such
> > errors. So it appears to be a patch problem.
> >
>
> Hi
>
> FYI, here is some more information about ERRORs seen.
>
> The patches were re-tested -- applied in stages (and also against the
> different scripts) to identify where the problem was introduced. Below
> are the observations:
>
> ~~~
>
> Using original test scripts
>
> 1. Using only patch v21-0001
> - no errors
>
> 2. Using only patch v21-0001+0002
> - no errors
>
> 3. Using patch v21-0001+0002+0003
> - no errors
>
> ~~~
>
> Using the "busy loop" test scripts for long transactions
>
> 1. Using only patch v21-0001
> - no errors
>
> 2. Using only patch v21-0001+0002
> - gives errors for "no copy in progress issue"
> e.g. ERROR:  could not send data to WAL stream: no COPY in progress
>
> 3. Using patch v21-0001+0002+0003
> - gives the same "no copy in progress issue" errors as above
> e.g. ERROR:  could not send data to WAL stream: no COPY in progress
> - and also gives slot consistency point errors
> e.g. ERROR:  could not create replication slot
> "pg_16700_sync_16514_7261998170966054867": ERROR:  could not find
> logical decoding starting point
> e.g. LOG:  could not drop replication slot
> "pg_16700_sync_16454_7261998170966054867" on publisher: ERROR:
> replication slot "pg_16700_sync_16454_7261998170966054867" does not
> exist

I agree that  "no copy in progress issue" issue has nothing to do with
0001 patch. This issue is present with the 0002 patch.
In the case when the tablesync worker has to apply the transactions
after the table is synced, the tablesync worker sends the feedback of
writepos, applypos and flushpos which results in "No copy in progress"
error as the stream has ended already. Fixed it by exiting the
streaming loop if the tablesync worker is done with the
synchronization. The attached 0004 patch has the changes for the same.
The rest of v22 patches are the same patch that were posted by Melih
in the earlier mail.

Regards,
Vignesh
From bd18bd59be0a263cb3385353e73ec25542bdeff2 Mon Sep 17 00:00:00 2001
From: Melih Mutlu 
Date: Tue, 4 Jul 2023 22:04:46 +0300
Subject: [PATCH v22 2/3] Reuse Tablesync Workers

Before this patch, tablesync workers were 

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-31 Thread Peter Smith
On Fri, Jul 28, 2023 at 5:22 PM Peter Smith  wrote:
>
> Hi Melih,
>
> BACKGROUND
> --
>
> We wanted to compare performance for the 2 different reuse-worker
> designs, when the apply worker is already busy handling other
> replications, and then simultaneously the test table tablesyncs are
> occurring.
>
> To test this scenario, some test scripts were written (described
> below). For comparisons, the scripts were then run using a build of
> HEAD; design #1 (v21); design #2 (0718).
>
> HOW THE TEST WORKS
> --
>
> Overview:
> 1. The apply worker is made to subscribe to a 'busy_tbl'.
> 2. After the SUBSCRIPTION is created, the publisher-side then loops
> (forever) doing INSERTS into that busy_tbl.
> 3. While the apply worker is now busy, the subscriber does an ALTER
> SUBSCRIPTION REFRESH PUBLICATION to subscribe to all the other test
> tables.
> 4. We time how long it takes for all tablsyncs to complete
> 5. Repeat above for different numbers of empty tables (10, 100, 1000,
> 2000) and different numbers of sync workers (2, 4, 8, 16)
>
> Scripts
> ---
>
> (PSA 4 scripts to implement this logic)
>
> testrun script
> - this does common setup (do_one_test_setup) and then the pub/sub
> scripts (do_one_test_PUB and do_one_test_SUB -- see below) are run in
> parallel
> - repeat 10 times
>
> do_one_test_setup script
> - init and start instances
> - ipc setup tables and procedures
>
> do_one_test_PUB script
> - ipc setup pub/sub
> - table setup
> - publishes the "busy_tbl", but then waits for the subscriber to
> subscribe to only this one
> - alters the publication to include all other tables (so subscriber
> will see these only after the ALTER SUBSCRIPTION PUBLICATION REFRESH)
> - enter a busy INSERT loop until it informed by the subscriber that
> the test is finished
>
> do_one_test_SUB script
> - ipc setup pub/sub
> - table setup
> - subscribes only to "busy_tbl", then informs the publisher when that
> is done (this will cause the publisher to commence the stay_busy loop)
> - after it knows the publishing busy loop has started it does
> - ALTER SUBSCRIPTION REFRESH PUBLICATION
> - wait until all the tablesyncs are ready <=== This is the part that
> is timed for the test RESULT
>
> PROBLEM
> ---
>
> Looking at the output files (e.g. *.dat_PUB and *.dat_SUB)  they seem
> to confirm the tests are working how we wanted.
>
> Unfortunately, there is some slot problem for the patched builds (both
> designs #1 and #2). e.g. Search "ERROR" in the *.log files and see
> many slot-related errors.
>
> Please note - running these same scripts with HEAD build gave no such
> errors. So it appears to be a patch problem.
>

Hi

FYI, here is some more information about ERRORs seen.

The patches were re-tested -- applied in stages (and also against the
different scripts) to identify where the problem was introduced. Below
are the observations:

~~~

Using original test scripts

1. Using only patch v21-0001
- no errors

2. Using only patch v21-0001+0002
- no errors

3. Using patch v21-0001+0002+0003
- no errors

~~~

Using the "busy loop" test scripts for long transactions

1. Using only patch v21-0001
- no errors

2. Using only patch v21-0001+0002
- gives errors for "no copy in progress issue"
e.g. ERROR:  could not send data to WAL stream: no COPY in progress

3. Using patch v21-0001+0002+0003
- gives the same "no copy in progress issue" errors as above
e.g. ERROR:  could not send data to WAL stream: no COPY in progress
- and also gives slot consistency point errors
e.g. ERROR:  could not create replication slot
"pg_16700_sync_16514_7261998170966054867": ERROR:  could not find
logical decoding starting point
e.g. LOG:  could not drop replication slot
"pg_16700_sync_16454_7261998170966054867" on publisher: ERROR:
replication slot "pg_16700_sync_16454_7261998170966054867" does not
exist

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-27 Thread Peter Smith
On Thu, Jul 27, 2023 at 11:30 PM Melih Mutlu  wrote:
>
> Hi Peter,
>
> Peter Smith , 26 Tem 2023 Çar, 07:40 tarihinde şunu 
> yazdı:
>>
>> Here are some comments for patch v22-0001.
>>
>> ==
>> 1. General -- naming conventions
>>
>> There is quite a lot of inconsistency with variable/parameter naming
>> styles in this patch. I understand in most cases the names are copied
>> unchanged from the original functions. Still, since this is a big
>> refactor anyway, it can also be a good opportunity to clean up those
>> inconsistencies instead of just propagating them to different places.
>> IIUC, the usual reluctance to rename things because it would cause
>> backpatch difficulties doesn't apply here (since everything is being
>> refactored anyway).
>>
>> E.g. Consider using use snake_case names more consistently in the
>> following places:
>
>
> I can simply change the places you mentioned, that seems okay to me.
> The reason why I did not change the namings in existing variables/functions 
> is because I did (and still do) not get what's the naming conventions in 
> those files. Is snake_case the convention for variables in those files (or in 
> general)?
>

TBH, I also don't know if there is a specific Postgres coding
guideline to use snake_case or not (and Chat-GPT did not know either
when I asked about it). I only assumed snake_case in my previous
review comment because the mentioned vars were already all lowercase.
Anyway, the point was that whatever style is chosen, it ought to be
used *consistently* because having a random mixture of styles in the
same function (e.g. worker_slot, originname, origin_startpos,
myslotname, options, server_version) seems messy. Meanwhile, I think
Amit suggested [1] that for now, we only need to worry about the name
consistency in new code.


>> 2. SetupApplyOrSyncWorker
>>
>> -ApplyWorkerMain(Datum main_arg)
>> +SetupApplyOrSyncWorker(int worker_slot)
>>  {
>> - int worker_slot = DatumGetInt32(main_arg);
>> - char originname[NAMEDATALEN];
>> - XLogRecPtr origin_startpos = InvalidXLogRecPtr;
>> - char*myslotname = NULL;
>> - WalRcvStreamOptions options;
>> - int server_version;
>> -
>> - InitializingApplyWorker = true;
>> -
>>   /* Attach to slot */
>>   logicalrep_worker_attach(worker_slot);
>>
>> + Assert(am_tablesync_worker() || am_leader_apply_worker());
>> +
>>
>> Why is the Assert not the very first statement of this function?
>
>
> I would also prefer to assert in the very beginning but am_tablesync_worker 
> and am_leader_apply_worker require MyLogicalRepWorker to be not NULL. And 
> MyLogicalRepWorker is assigned in logicalrep_worker_attach. I can change this 
> if you think there is a better way to check the worker type.
>

I see. In that case your Assert LGTM.

--
[1] 
https://www.postgresql.org/message-id/CAA4eK1%2Bh9hWDAKupsoiw556xqh7uvj_F1pjFJc4jQhL89HdGww%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-27 Thread Melih Mutlu
Hi Peter,

Peter Smith , 26 Tem 2023 Çar, 07:40 tarihinde şunu
yazdı:

> Here are some comments for patch v22-0001.
>
> ==
> 1. General -- naming conventions
>
> There is quite a lot of inconsistency with variable/parameter naming
> styles in this patch. I understand in most cases the names are copied
> unchanged from the original functions. Still, since this is a big
> refactor anyway, it can also be a good opportunity to clean up those
> inconsistencies instead of just propagating them to different places.
> IIUC, the usual reluctance to rename things because it would cause
> backpatch difficulties doesn't apply here (since everything is being
> refactored anyway).
>
> E.g. Consider using use snake_case names more consistently in the
> following places:
>

I can simply change the places you mentioned, that seems okay to me.
The reason why I did not change the namings in existing variables/functions
is because I did (and still do) not get what's the naming conventions in
those files. Is snake_case the convention for variables in those files (or
in general)?

2. SetupApplyOrSyncWorker
>
> -ApplyWorkerMain(Datum main_arg)
> +SetupApplyOrSyncWorker(int worker_slot)
>  {
> - int worker_slot = DatumGetInt32(main_arg);
> - char originname[NAMEDATALEN];
> - XLogRecPtr origin_startpos = InvalidXLogRecPtr;
> - char*myslotname = NULL;
> - WalRcvStreamOptions options;
> - int server_version;
> -
> - InitializingApplyWorker = true;
> -
>   /* Attach to slot */
>   logicalrep_worker_attach(worker_slot);
>
> + Assert(am_tablesync_worker() || am_leader_apply_worker());
> +
>
> Why is the Assert not the very first statement of this function?
>

I would also prefer to assert in the very beginning but am_tablesync_worker
and am_leader_apply_worker require MyLogicalRepWorker to be not NULL.
And MyLogicalRepWorker is assigned in logicalrep_worker_attach. I can
change this if you think there is a better way to check the worker type.

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-26 Thread Amit Kapila
On Thu, Jul 27, 2023 at 6:46 AM Peter Smith  wrote:
>
> Here are some review comments for v22-0003
>
> ==
>
> 1. ApplicationNameForTablesync
> +/*
> + * Determine the application_name for tablesync workers.
> + *
> + * Previously, the replication slot name was used as application_name. Since
> + * it's possible to reuse tablesync workers now, a tablesync worker can 
> handle
> + * several different replication slots during its lifetime. Therefore, we
> + * cannot use the slot name as application_name anymore. Instead, the slot
> + * number of the tablesync worker is used as a part of the application_name.
> + *
> + * FIXME: if the tablesync worker starts to reuse the replication slot during
> + * synchronization, we should again use the replication slot name as
> + * application_name.
> + */
> +static void
> +ApplicationNameForTablesync(Oid suboid, int worker_slot,
> + char *application_name, Size szapp)
> +{
> + snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid,
> + worker_slot, GetSystemIdentifier());
> +}
>
> 1a.
> The intent of the "FIXME" comment was not clear. Is this some existing
> problem that needs addressing, or is this really more like just an
> "XXX" warning/note for the future, in case the tablesync logic
> changes?
>

This seems to be a Note for the future, so better to use XXX notation here.

> ~
>
> 1b.
> Since this is a new function, should it be named according to the
> convention for static functions?
>
> e.g.
> ApplicationNameForTablesync -> app_name_for_tablesync
>

I think for now let's follow the style for similar functions like
ReplicationOriginNameForLogicalRep() and
ReplicationSlotNameForTablesync().

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-26 Thread Peter Smith
Here are some review comments for v22-0003

==

1. ApplicationNameForTablesync
+/*
+ * Determine the application_name for tablesync workers.
+ *
+ * Previously, the replication slot name was used as application_name. Since
+ * it's possible to reuse tablesync workers now, a tablesync worker can handle
+ * several different replication slots during its lifetime. Therefore, we
+ * cannot use the slot name as application_name anymore. Instead, the slot
+ * number of the tablesync worker is used as a part of the application_name.
+ *
+ * FIXME: if the tablesync worker starts to reuse the replication slot during
+ * synchronization, we should again use the replication slot name as
+ * application_name.
+ */
+static void
+ApplicationNameForTablesync(Oid suboid, int worker_slot,
+ char *application_name, Size szapp)
+{
+ snprintf(application_name, szapp, "pg_%u_sync_%i_" UINT64_FORMAT, suboid,
+ worker_slot, GetSystemIdentifier());
+}

1a.
The intent of the "FIXME" comment was not clear. Is this some existing
problem that needs addressing, or is this really more like just an
"XXX" warning/note for the future, in case the tablesync logic
changes?

~

1b.
Since this is a new function, should it be named according to the
convention for static functions?

e.g.
ApplicationNameForTablesync -> app_name_for_tablesync

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-26 Thread Peter Smith
Here are some review comments for v22-0002

==
1. General - errmsg

AFAIK, the errmsg part does not need to be enclosed by extra parentheses.

e.g.
BEFORE
ereport(LOG,
(errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
MySubscription->name)));
AFTER
ereport(LOG,
errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
MySubscription->name));

~

The patch has multiple cases similar to that example.

==
src/backend/replication/logical/tablesync.c

2.
+ if (reuse_worker)
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" will be reused to sync table \"%s\" with relid
%u.",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
+ MySubscription->name)));
+ }

These brackets { } are not really necessary.

~~~

3. TablesyncWorkerMain
+ for (;!done;)
+ {
+ List*rstates;
+ ListCell   *lc;
+
+ run_tablesync_worker();
+
+ if (IsTransactionState())
+ CommitTransactionCommand();
+
+ if (MyLogicalRepWorker->relsync_completed)
+ {
+ /*
+ * This tablesync worker is 'done' unless another table that needs
+ * syncing is found.
+ */
+ done = true;

Those variables 'rstates' and 'lc' do not need to be declared at this
scope -- they can be declared further down, closer to where they are
needed.

=
src/backend/replication/logical/worker.c

4. LogicalRepApplyLoop
+
+ if (am_tablesync_worker())
+ /*
+ * If relsync_completed is true, this means that the tablesync
+ * worker is done with synchronization. Streaming has already been
+ * ended by process_syncing_tables_for_sync. We should move to the
+ * next table if needed, or exit.
+ */
+ if (MyLogicalRepWorker->relsync_completed)
+ endofstream = true;

Here I think it is better to use bracketing { } for the outer "if",
instead of only relying on the indentation for readability. YMMV.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-25 Thread Amit Kapila
On Wed, Jul 26, 2023 at 10:10 AM Peter Smith  wrote:
>
> Here are some comments for patch v22-0001.
>
> ==
> 1. General -- naming conventions
>
> There is quite a lot of inconsistency with variable/parameter naming
> styles in this patch. I understand in most cases the names are copied
> unchanged from the original functions. Still, since this is a big
> refactor anyway, it can also be a good opportunity to clean up those
> inconsistencies instead of just propagating them to different places.
>

I am not against improving consistency in the naming of existing
variables but I feel it would be better to do as a separate patch
along with improving the consistency function names. For new
functions/variables, it would be good to follow a consistent style.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-25 Thread Peter Smith
Here are some comments for patch v22-0001.

==
1. General -- naming conventions

There is quite a lot of inconsistency with variable/parameter naming
styles in this patch. I understand in most cases the names are copied
unchanged from the original functions. Still, since this is a big
refactor anyway, it can also be a good opportunity to clean up those
inconsistencies instead of just propagating them to different places.
IIUC, the usual reluctance to rename things because it would cause
backpatch difficulties doesn't apply here (since everything is being
refactored anyway).

E.g. Consider using use snake_case names more consistently in the
following places:

~

1a. start_table_sync

+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+ char*syncslotname = NULL;

origin_startpos -> (no change)
myslotname -> my_slot_name (But, is there a better name for this than
calling it "my" slot name)
syncslotname -> sync_slot_name

~

1b. run_tablesync_worker

+static void
+run_tablesync_worker()
+{
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char*slotname = NULL;
+ WalRcvStreamOptions options;

originname -> origin_name
origin_startpos -> (no change)
slotname -> slot_name

~

1c. set_stream_options

+void
+set_stream_options(WalRcvStreamOptions *options,
+char *slotname,
+XLogRecPtr *origin_startpos)
+{
+ int server_version;

options -> (no change)
slotname -> slot_name
origin_startpos -> (no change)
server_version -> (no change)

~

1d. run_apply_worker

 static void
-start_apply(XLogRecPtr origin_startpos)
+run_apply_worker()
 {
- PG_TRY();
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos = InvalidXLogRecPtr;
+ char*slotname = NULL;
+ WalRcvStreamOptions options;
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char*err;
+ bool must_use_password;

originname -> origin_name
origin_startpos => (no change)
slotname -> slot_name
originid -> origin_id

==
src/backend/replication/logical/worker.c

2. SetupApplyOrSyncWorker

-ApplyWorkerMain(Datum main_arg)
+SetupApplyOrSyncWorker(int worker_slot)
 {
- int worker_slot = DatumGetInt32(main_arg);
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos = InvalidXLogRecPtr;
- char*myslotname = NULL;
- WalRcvStreamOptions options;
- int server_version;
-
- InitializingApplyWorker = true;
-
  /* Attach to slot */
  logicalrep_worker_attach(worker_slot);

+ Assert(am_tablesync_worker() || am_leader_apply_worker());
+

Why is the Assert not the very first statement of this function?

==
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-25 Thread Melih Mutlu
Hi,

Melih Mutlu , 21 Tem 2023 Cum, 12:47 tarihinde şunu
yazdı:

> I did not realize the order is the same with .c files. Good to know. I'll
> fix it along with other comments.
>

Addressed the recent reviews and attached the updated patches.

Thanks,
-- 
Melih Mutlu
Microsoft


v22-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


v22-0002-Reuse-Tablesync-Workers.patch
Description: Binary data


v22-0003-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-21 Thread Melih Mutlu
Peter Smith , 21 Tem 2023 Cum, 12:48 tarihinde şunu
yazdı:

> On Fri, Jul 21, 2023 at 5:24 PM Amit Kapila 
> wrote:
> >
> > On Fri, Jul 21, 2023 at 12:05 PM Peter Smith 
> wrote:
> > >
> > > On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila 
> wrote:
> > > >
>
> > > > The other thing I noticed is that we
> > > > don't seem to be consistent in naming functions in these files. For
> > > > example, shall we make all exposed functions follow camel case (like
> > > > InitializeLogRepWorker) and static functions follow _ style (like
> > > > run_apply_worker) or the other possibility is to use _ style for all
> > > > functions except may be the entry functions like ApplyWorkerMain()? I
> > > > don't know if there is already a pattern but if not then let's form
> it
> > > > now, so that code looks consistent.
> > > >
> > >
> > > +1 for using some consistent rule, but I think this may result in
> > > *many* changes, so it would be safer to itemize all the changes first,
> > > just to make sure everybody is OK with it first before updating
> > > everything.
> > >
> >
> > Fair enough. We can do that as a first patch and then work on the
> > refactoring patch to avoid introducing more inconsistencies or we can
> > do the refactoring patch first but keep all the new function names to
> > follow _ style.
> >
>
> Fixing the naming inconsistency will be more far-reaching than just a
> few functions affected by these "reuse" patches. There are plenty of
> existing functions already inconsistently named in the HEAD code. So
> perhaps this topic should be moved to a separate thread?
>

+1 for moving it to a separate thread. This is not something particularly
introduced by this patch.

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-21 Thread Peter Smith
On Fri, Jul 21, 2023 at 5:24 PM Amit Kapila  wrote:
>
> On Fri, Jul 21, 2023 at 12:05 PM Peter Smith  wrote:
> >
> > On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila  wrote:
> > >

> > > The other thing I noticed is that we
> > > don't seem to be consistent in naming functions in these files. For
> > > example, shall we make all exposed functions follow camel case (like
> > > InitializeLogRepWorker) and static functions follow _ style (like
> > > run_apply_worker) or the other possibility is to use _ style for all
> > > functions except may be the entry functions like ApplyWorkerMain()? I
> > > don't know if there is already a pattern but if not then let's form it
> > > now, so that code looks consistent.
> > >
> >
> > +1 for using some consistent rule, but I think this may result in
> > *many* changes, so it would be safer to itemize all the changes first,
> > just to make sure everybody is OK with it first before updating
> > everything.
> >
>
> Fair enough. We can do that as a first patch and then work on the
> refactoring patch to avoid introducing more inconsistencies or we can
> do the refactoring patch first but keep all the new function names to
> follow _ style.
>

Fixing the naming inconsistency will be more far-reaching than just a
few functions affected by these "reuse" patches. There are plenty of
existing functions already inconsistently named in the HEAD code. So
perhaps this topic should be moved to a separate thread?

For example, here are some existing/proposed names:

===

worker.c (HEAD)

static functions
  DisableSubscriptionAndExit -> disable_subscription_and_exit
  FindReplTupleInLocalRel -> find_repl_tuple_in_local_rel
  TwoPhaseTransactionGid -> two_phase_transaction_gid
  TargetPrivilegesCheck -> target_privileges_check
  UpdateWorkerStats -> update_worker_stats
  LogicalRepApplyLoop -> logical_rep_apply_loop

non-static functions
  stream_stop_internal -> StreamStopInternal
  apply_spooled_messages -> ApplySpooledMessages
  apply_dispatch -> ApplyDispatch
  store_flush_position -> StoreFlushPosition
  set_apply_error_context_origin -> SetApplyErrorContextOrigin

===

tablesync.c (HEAD)

static functions
  FetchTableStates -> fetch_table_states

non-static functions
  invalidate_syncing_table_states -> InvalidateSyncingTableStates

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-21 Thread Melih Mutlu
Amit Kapila , 21 Tem 2023 Cum, 08:39 tarihinde
şunu yazdı:

> On Fri, Jul 21, 2023 at 7:30 AM Peter Smith  wrote:
> How about SetupLogRepWorker? The other thing I noticed is that we
> don't seem to be consistent in naming functions in these files. For
> example, shall we make all exposed functions follow camel case (like
> InitializeLogRepWorker) and static functions follow _ style (like
> run_apply_worker) or the other possibility is to use _ style for all
> functions except may be the entry functions like ApplyWorkerMain()? I
> don't know if there is already a pattern but if not then let's form it
> now, so that code looks consistent.
>

I agree that these files have inconsistencies in naming things.
Most of the time I can't really figure out which naming convention I should
use. I try to name things by looking at other functions with similar
responsibilities.


> 3.
> >  extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
> >  XLogRecPtr remote_lsn);
> > +extern void set_stream_options(WalRcvStreamOptions *options,
> > +char *slotname,
> > +XLogRecPtr *origin_startpos);
> > +
> > +extern void start_apply(XLogRecPtr origin_startpos);
> > +extern void DisableSubscriptionAndExit(void);
> > +extern void StartLogRepWorker(int worker_slot);
> >
> > This placement (esp. with the missing whitespace) seems to be grouping
> > the set_stream_options with the other 'pa' externs, which are all
> > under the comment "/* Parallel apply worker setup and interactions
> > */".
> >
> > Putting all these up near the other "extern void
> > InitializeLogRepWorker(void)" might be less ambiguous.
> >
>
> +1. Also, note that they should be in the same order as they are in .c
> files.
>

I did not realize the order is the same with .c files. Good to know. I'll
fix it along with other comments.

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-21 Thread Amit Kapila
On Fri, Jul 21, 2023 at 12:05 PM Peter Smith  wrote:
>
> On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila  wrote:
> >
> > On Fri, Jul 21, 2023 at 7:30 AM Peter Smith  wrote:
> > >
> > > ~~~
> > >
> > > 2. StartLogRepWorker
> > >
> > > /* Common function to start the leader apply or tablesync worker. */
> > > void
> > > StartLogRepWorker(int worker_slot)
> > > {
> > > /* Attach to slot */
> > > logicalrep_worker_attach(worker_slot);
> > >
> > > /* Setup signal handling */
> > > pqsignal(SIGHUP, SignalHandlerForConfigReload);
> > > pqsignal(SIGTERM, die);
> > > BackgroundWorkerUnblockSignals();
> > >
> > > /*
> > > * We don't currently need any ResourceOwner in a walreceiver process, but
> > > * if we did, we could call CreateAuxProcessResourceOwner here.
> > > */
> > >
> > > /* Initialise stats to a sanish value */
> > > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
> > > MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
> > >
> > > /* Load the libpq-specific functions */
> > > load_file("libpqwalreceiver", false);
> > >
> > > InitializeLogRepWorker();
> > >
> > > /* Connect to the origin and start the replication. */
> > > elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
> > > MySubscription->conninfo);
> > >
> > > /*
> > > * Setup callback for syscache so that we know when something changes in
> > > * the subscription relation state.
> > > */
> > > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
> > >   invalidate_syncing_table_states,
> > >   (Datum) 0);
> > > }
> > >
> > > ~
> > >
> > > 2a.
> > > The function name seems a bit misleading because it is not really
> > > "starting" anything here - it is just more "initialization" code,
> > > right? Nor is it common to all kinds of LogRepWorker. Maybe the
> > > function could be named something else like 'InitApplyOrSyncWorker()'.
> > > -- see also #2c
> > >
> >
> > How about SetupLogRepWorker?
>
> The name is better than StartXXX, but still, SetupXXX seems a synonym
> of InitXXX. That is why I thought it is a bit awkward having 2
> functions with effectively the same name and the same
> initialization/setup purpose (the only difference is one function
> excludes parallel workers, and the other function is common to all
> workers).
>

I can't know of a better way. We can probably name it as
SetupApplyOrSyncWorker or something like that if you find that better.

> > The other thing I noticed is that we
> > don't seem to be consistent in naming functions in these files. For
> > example, shall we make all exposed functions follow camel case (like
> > InitializeLogRepWorker) and static functions follow _ style (like
> > run_apply_worker) or the other possibility is to use _ style for all
> > functions except may be the entry functions like ApplyWorkerMain()? I
> > don't know if there is already a pattern but if not then let's form it
> > now, so that code looks consistent.
> >
>
> +1 for using some consistent rule, but I think this may result in
> *many* changes, so it would be safer to itemize all the changes first,
> just to make sure everybody is OK with it first before updating
> everything.
>

Fair enough. We can do that as a first patch and then work on the
refactoring patch to avoid introducing more inconsistencies or we can
do the refactoring patch first but keep all the new function names to
follow _ style.

Apart from this, few more comments on 0001:
1.
+run_apply_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)

The caller neither uses nor passes the value of origin_startpos. So,
isn't it better to make origin_startpos local to run_apply_worker()?
It seems the same is true for some of the other parameters slotname,
originname, originname_size. Is there a reason to keep these as
arguments in this function?

2.
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)

The comments in the previous point seem to apply to this as well.

3.
+ set_stream_options(options, slotname, origin_startpos);
+
+ walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())

This last check is done in set_stream_options() and here as well. I
don't see any reason to give different answers at both places but
before the patch, we were not relying on any such assumption that this
check will always give the same answer considering the answer could be
different due to AllTablesyncsReady(). Can we move this check outside
set_stream_options()?

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-21 Thread Peter Smith
On Fri, Jul 21, 2023 at 3:39 PM Amit Kapila  wrote:
>
> On Fri, Jul 21, 2023 at 7:30 AM Peter Smith  wrote:
> >
> > ~~~
> >
> > 2. StartLogRepWorker
> >
> > /* Common function to start the leader apply or tablesync worker. */
> > void
> > StartLogRepWorker(int worker_slot)
> > {
> > /* Attach to slot */
> > logicalrep_worker_attach(worker_slot);
> >
> > /* Setup signal handling */
> > pqsignal(SIGHUP, SignalHandlerForConfigReload);
> > pqsignal(SIGTERM, die);
> > BackgroundWorkerUnblockSignals();
> >
> > /*
> > * We don't currently need any ResourceOwner in a walreceiver process, but
> > * if we did, we could call CreateAuxProcessResourceOwner here.
> > */
> >
> > /* Initialise stats to a sanish value */
> > MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
> > MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
> >
> > /* Load the libpq-specific functions */
> > load_file("libpqwalreceiver", false);
> >
> > InitializeLogRepWorker();
> >
> > /* Connect to the origin and start the replication. */
> > elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
> > MySubscription->conninfo);
> >
> > /*
> > * Setup callback for syscache so that we know when something changes in
> > * the subscription relation state.
> > */
> > CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
> >   invalidate_syncing_table_states,
> >   (Datum) 0);
> > }
> >
> > ~
> >
> > 2a.
> > The function name seems a bit misleading because it is not really
> > "starting" anything here - it is just more "initialization" code,
> > right? Nor is it common to all kinds of LogRepWorker. Maybe the
> > function could be named something else like 'InitApplyOrSyncWorker()'.
> > -- see also #2c
> >
>
> How about SetupLogRepWorker?

The name is better than StartXXX, but still, SetupXXX seems a synonym
of InitXXX. That is why I thought it is a bit awkward having 2
functions with effectively the same name and the same
initialization/setup purpose (the only difference is one function
excludes parallel workers, and the other function is common to all
workers).

> The other thing I noticed is that we
> don't seem to be consistent in naming functions in these files. For
> example, shall we make all exposed functions follow camel case (like
> InitializeLogRepWorker) and static functions follow _ style (like
> run_apply_worker) or the other possibility is to use _ style for all
> functions except may be the entry functions like ApplyWorkerMain()? I
> don't know if there is already a pattern but if not then let's form it
> now, so that code looks consistent.
>

+1 for using some consistent rule, but I think this may result in
*many* changes, so it would be safer to itemize all the changes first,
just to make sure everybody is OK with it first before updating
everything.

--
Kind Regards,
Peter Smith




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Amit Kapila
On Fri, Jul 21, 2023 at 7:30 AM Peter Smith  wrote:
>
> ~~~
>
> 2. StartLogRepWorker
>
> /* Common function to start the leader apply or tablesync worker. */
> void
> StartLogRepWorker(int worker_slot)
> {
> /* Attach to slot */
> logicalrep_worker_attach(worker_slot);
>
> /* Setup signal handling */
> pqsignal(SIGHUP, SignalHandlerForConfigReload);
> pqsignal(SIGTERM, die);
> BackgroundWorkerUnblockSignals();
>
> /*
> * We don't currently need any ResourceOwner in a walreceiver process, but
> * if we did, we could call CreateAuxProcessResourceOwner here.
> */
>
> /* Initialise stats to a sanish value */
> MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
> MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
>
> /* Load the libpq-specific functions */
> load_file("libpqwalreceiver", false);
>
> InitializeLogRepWorker();
>
> /* Connect to the origin and start the replication. */
> elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
> MySubscription->conninfo);
>
> /*
> * Setup callback for syscache so that we know when something changes in
> * the subscription relation state.
> */
> CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
>   invalidate_syncing_table_states,
>   (Datum) 0);
> }
>
> ~
>
> 2a.
> The function name seems a bit misleading because it is not really
> "starting" anything here - it is just more "initialization" code,
> right? Nor is it common to all kinds of LogRepWorker. Maybe the
> function could be named something else like 'InitApplyOrSyncWorker()'.
> -- see also #2c
>

How about SetupLogRepWorker? The other thing I noticed is that we
don't seem to be consistent in naming functions in these files. For
example, shall we make all exposed functions follow camel case (like
InitializeLogRepWorker) and static functions follow _ style (like
run_apply_worker) or the other possibility is to use _ style for all
functions except may be the entry functions like ApplyWorkerMain()? I
don't know if there is already a pattern but if not then let's form it
now, so that code looks consistent.

> ~
>
> 2b.
> Should this have Assert to ensure this is only called from leader
> apply or tablesync? -- see also #2c
>
> ~
>
> 2c.
> IMO maybe the best/tidiest way to do this is not to introduce a new
> function at all. Instead, just put all this "common init" code into
> the existing "common init" function ('InitializeLogRepWorker') and
> execute it only if (am_tablesync_worker() || am_leader_apply_worker())
> { }.
>

I don't like 2c much because it will make InitializeLogRepWorker()
have two kinds of initializations.

> ==
> src/include/replication/worker_internal.h
>
> 3.
>  extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
>  XLogRecPtr remote_lsn);
> +extern void set_stream_options(WalRcvStreamOptions *options,
> +char *slotname,
> +XLogRecPtr *origin_startpos);
> +
> +extern void start_apply(XLogRecPtr origin_startpos);
> +extern void DisableSubscriptionAndExit(void);
> +extern void StartLogRepWorker(int worker_slot);
>
> This placement (esp. with the missing whitespace) seems to be grouping
> the set_stream_options with the other 'pa' externs, which are all
> under the comment "/* Parallel apply worker setup and interactions
> */".
>
> Putting all these up near the other "extern void
> InitializeLogRepWorker(void)" might be less ambiguous.
>

+1. Also, note that they should be in the same order as they are in .c files.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Peter Smith
Some review comments for v21-0002.

On Thu, Jul 20, 2023 at 11:41 PM Melih Mutlu  wrote:
>
> Hi,
>
> Attached the updated patches with recent reviews addressed.
>
> See below for my comments:
>
> Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu 
> yazdı:
>>
>> 5.
>> + /* Found a table for next iteration */
>> + finish_sync_worker(true);
>> +
>> + StartTransactionCommand();
>> + ereport(LOG,
>> + (errmsg("logical replication worker for subscription \"%s\" will be
>> reused to sync table \"%s\" with relid %u.",
>> + MySubscription->name,
>> + get_rel_name(MyLogicalRepWorker->relid),
>> + MyLogicalRepWorker->relid)));
>> + CommitTransactionCommand();
>> +
>> + done = false;
>> + break;
>> + }
>> + LWLockRelease(LogicalRepWorkerLock);
>>
>>
>> 5b.
>> Isn't there a missing call to that LWLockRelease, if the 'break' happens?
>
>
> Lock is already released before break, if that's the lock you meant:
>
>> /* Update worker state for the next table */
>> MyLogicalRepWorker->relid = rstate->relid;
>> MyLogicalRepWorker->relstate = rstate->state;
>> MyLogicalRepWorker->relstate_lsn = rstate->lsn;
>> LWLockRelease(LogicalRepWorkerLock);
>>
>>
>> /* Found a table for next iteration */
>> finish_sync_worker(true);
>> done = false;
>> break;
>
>

Sorry, I misread the code. You are right.

==
src/backend/replication/logical/tablesync.c

1.
+ if (!reuse_worker)
+ {
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
+ MySubscription->name)));
+ }
+ else
+ {
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will be
reused to sync table \"%s\" with relid %u.",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ }

1a.
We know this must be a tablesync worker, so I think that second errmsg
should also be saying "logical replication table synchronization
worker".

~

1b.
Since this is if/else anyway, is it simpler to be positive and say "if
(reuse_worker)" instead of the negative "if (!reuse_worker)"

~~~

2. run_tablesync_worker
 {
+ MyLogicalRepWorker->relsync_completed = false;
+
+ /* Start table synchronization. */
  start_table_sync(origin_startpos, );
This still contains the added comment that I'd previously posted I
thought was adding anything useful. Also, I didn't think this comment
exists in the HEAD code.
==
src/backend/replication/logical/worker.c

3. LogicalRepApplyLoop

+ /*
+ * apply_dispatch() may have gone into apply_handle_commit()
+ * which can call process_syncing_tables_for_sync.
+ *
+ * process_syncing_tables_for_sync decides whether the sync of
+ * the current table is completed. If it is completed,
+ * streaming must be already ended. So, we can break the loop.
+ */
+ if (am_tablesync_worker() &&
+ MyLogicalRepWorker->relsync_completed)
+ {
+ endofstream = true;
+ break;
+ }
+

Maybe just personal taste, but IMO it is better to rearrange like
below because then there is no reason to read the long comment except
for tablesync workers.

if (am_tablesync_worker())
{
/*
 * apply_dispatch() may have gone into apply_handle_commit()
 * which can call process_syncing_tables_for_sync.
 *
 * process_syncing_tables_for_sync decides whether the sync of
 * the current table is completed. If it is completed,
 * streaming must be already ended. So, we can break the loop.
*/
if (MyLogicalRepWorker->relsync_completed)
{
endofstream = true;
break;
}
}

~~~

4. LogicalRepApplyLoop

+
+ /*
+ * If relsync_completed is true, this means that the tablesync
+ * worker is done with synchronization. Streaming has already been
+ * ended by process_syncing_tables_for_sync. We should move to the
+ * next table if needed, or exit.
+ */
+ if (am_tablesync_worker() &&
+ MyLogicalRepWorker->relsync_completed)
+ endofstream = true;

Ditto the same comment about rearranging the condition, as #3 above.

==
src/include/replication/worker_internal.h

5.
+ /*
+ * Indicates whether tablesync worker has completed syncing its assigned
+ * table.
+ */
+ bool relsync_completed;
+

Isn't it better to arrange this to be adjacent to other relXXX fields,
so they all clearly belong to that "Used for initial table
synchronization." group?

For example, something like:

/* Used for initial table synchronization. */
Oid  relid;
char relstate;
XLogRecPtr relstate_lsn;
slock_t relmutex;
bool relsync_completed; /* has tablesync finished syncing
the assigned table? */

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Peter Smith
Some review comments for v21-0001

==
src/backend/replication/logical/worker.c

1. InitializeLogRepWorker

  if (am_tablesync_worker())
  ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has started",
+ (errmsg("logical replication worker for subscription \"%s\", table
\"%s\" has started",
  MySubscription->name,
  get_rel_name(MyLogicalRepWorker->relid;

I think this should not be changed. IIUC that decision for using the
generic worker name for translations was only when the errmsg was in
shared code where the worker type was not clear from existing
conditions. See also previous review comments [1].

~~~

2. StartLogRepWorker

/* Common function to start the leader apply or tablesync worker. */
void
StartLogRepWorker(int worker_slot)
{
/* Attach to slot */
logicalrep_worker_attach(worker_slot);

/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();

/*
* We don't currently need any ResourceOwner in a walreceiver process, but
* if we did, we could call CreateAuxProcessResourceOwner here.
*/

/* Initialise stats to a sanish value */
MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
MyLogicalRepWorker->reply_time = GetCurrentTimestamp();

/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);

InitializeLogRepWorker();

/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);

/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
  invalidate_syncing_table_states,
  (Datum) 0);
}

~

2a.
The function name seems a bit misleading because it is not really
"starting" anything here - it is just more "initialization" code,
right? Nor is it common to all kinds of LogRepWorker. Maybe the
function could be named something else like 'InitApplyOrSyncWorker()'.
-- see also #2c

~

2b.
Should this have Assert to ensure this is only called from leader
apply or tablesync? -- see also #2c

~

2c.
IMO maybe the best/tidiest way to do this is not to introduce a new
function at all. Instead, just put all this "common init" code into
the existing "common init" function ('InitializeLogRepWorker') and
execute it only if (am_tablesync_worker() || am_leader_apply_worker())
{ }.

==
src/include/replication/worker_internal.h

3.
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
 XLogRecPtr remote_lsn);
+extern void set_stream_options(WalRcvStreamOptions *options,
+char *slotname,
+XLogRecPtr *origin_startpos);
+
+extern void start_apply(XLogRecPtr origin_startpos);
+extern void DisableSubscriptionAndExit(void);
+extern void StartLogRepWorker(int worker_slot);

This placement (esp. with the missing whitespace) seems to be grouping
the set_stream_options with the other 'pa' externs, which are all
under the comment "/* Parallel apply worker setup and interactions
*/".

Putting all these up near the other "extern void
InitializeLogRepWorker(void)" might be less ambiguous.

--
[1] worker name in errmsg -
https://www.postgresql.org/message-id/CAA4eK1%2B%2BwkxxMjsPh-z2aKa9ZjNhKsjv0Tnw%2BTVX-hCBkDHusw%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Peter Smith
On Thu, Jul 20, 2023 at 11:41 PM Melih Mutlu  wrote:
>
> Hi,
>
> Attached the updated patches with recent reviews addressed.
>
> See below for my comments:
>
> Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu 
> yazdı:
>>
>> Some review comments for v19-0001
>>
>> 2. LogicalRepSyncTableStart
>>
>> /*
>> * Finally, wait until the leader apply worker tells us to catch up and
>> * then return to let LogicalRepApplyLoop do it.
>> */
>> wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
>>
>> ~
>>
>> Should LogicalRepApplyLoop still be mentioned here, since that is
>> static in worker.c? Maybe it is better to refer instead to the common
>> 'start_apply' wrapper? (see also #5a below)
>
>
> Isn't' LogicalRepApplyLoop static on HEAD and also mentioned in the exact 
> comment in tablesync.c while the common "start_apply" function also exists? 
> I'm not sure how such a change would be related to this patch.
>

Fair enough. I thought it was questionable for one module to refer to
another module's static functions, but you are correct - it is not
really related to your patch. Sorry for the noise.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Melih Mutlu
Hi,

Attached the updated patches with recent reviews addressed.

See below for my comments:

Peter Smith , 19 Tem 2023 Çar, 06:08 tarihinde şunu
yazdı:

> Some review comments for v19-0001
>
> 2. LogicalRepSyncTableStart
>
> /*
> * Finally, wait until the leader apply worker tells us to catch up and
> * then return to let LogicalRepApplyLoop do it.
> */
> wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
>
> ~
>
> Should LogicalRepApplyLoop still be mentioned here, since that is
> static in worker.c? Maybe it is better to refer instead to the common
> 'start_apply' wrapper? (see also #5a below)


Isn't' LogicalRepApplyLoop static on HEAD and also mentioned in the exact
comment in tablesync.c while the common "start_apply" function also exists?
I'm not sure how such a change would be related to this patch.

---

5.
> + /* Found a table for next iteration */
> + finish_sync_worker(true);
> +
> + StartTransactionCommand();
> + ereport(LOG,
> + (errmsg("logical replication worker for subscription \"%s\" will be
> reused to sync table \"%s\" with relid %u.",
> + MySubscription->name,
> + get_rel_name(MyLogicalRepWorker->relid),
> + MyLogicalRepWorker->relid)));
> + CommitTransactionCommand();
> +
> + done = false;
> + break;
> + }
> + LWLockRelease(LogicalRepWorkerLock);


> 5b.
> Isn't there a missing call to that LWLockRelease, if the 'break' happens?


Lock is already released before break, if that's the lock you meant:

/* Update worker state for the next table */
> MyLogicalRepWorker->relid = rstate->relid;
> MyLogicalRepWorker->relstate = rstate->state;
> MyLogicalRepWorker->relstate_lsn = rstate->lsn;
> LWLockRelease(LogicalRepWorkerLock);


> /* Found a table for next iteration */
> finish_sync_worker(true);
> done = false;
> break;


---

2.
> As for the publisher node, this patch allows to reuse logical
> walsender processes
> after the streaming is done once.


> ~


> Is this paragraph even needed? Since the connection is reused then it
> already implies the other end (the Wlasender) is being reused, right?


I actually see no harm in explaining this explicitly.


Thanks,
-- 
Melih Mutlu
Microsoft


v21-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


v21-0002-Reuse-Tablesync-Workers.patch
Description: Binary data


v21-0003-Reuse-connection-when-tablesync-workers-change-t.patch
Description: Binary data


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Amit Kapila
On Thu, Jul 20, 2023 at 5:12 PM Melih Mutlu  wrote:
>
> Peter Smith , 20 Tem 2023 Per, 05:41 tarihinde şunu 
> yazdı:
>>
>> 7. InitializeLogRepWorker
>>
>>   if (am_tablesync_worker())
>>   ereport(LOG,
>> - (errmsg("logical replication worker for subscription \"%s\", table
>> \"%s\" has started",
>> + (errmsg("logical replication worker for subscription \"%s\", table
>> \"%s\" with relid %u has started",
>>   MySubscription->name,
>> - get_rel_name(MyLogicalRepWorker->relid;
>> + get_rel_name(MyLogicalRepWorker->relid),
>> + MyLogicalRepWorker->relid)));
>>
>> But this is certainly a tablesync worker so the message here should
>> say "logical replication table synchronization worker" like the HEAD
>> code used to do.
>>
>> It seems this mistake was introduced in patch v20-0001.
>
>
> I'm a bit confused here. Isn't it decided to use "logical replication worker" 
> regardless of the worker's type [1]. That's why I made this change. If that's 
> not the case here, I'll put it back.
>

I feel where the worker type is clear, it is better to use it unless
the same can lead to translation issues.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Melih Mutlu
Hi,

Peter Smith , 20 Tem 2023 Per, 05:41 tarihinde şunu
yazdı:

> 7. InitializeLogRepWorker
>
>   if (am_tablesync_worker())
>   ereport(LOG,
> - (errmsg("logical replication worker for subscription \"%s\", table
> \"%s\" has started",
> + (errmsg("logical replication worker for subscription \"%s\", table
> \"%s\" with relid %u has started",
>   MySubscription->name,
> - get_rel_name(MyLogicalRepWorker->relid;
> + get_rel_name(MyLogicalRepWorker->relid),
> + MyLogicalRepWorker->relid)));
>
> But this is certainly a tablesync worker so the message here should
> say "logical replication table synchronization worker" like the HEAD
> code used to do.
>
> It seems this mistake was introduced in patch v20-0001.
>

I'm a bit confused here. Isn't it decided to use "logical replication
worker" regardless of the worker's type [1]. That's why I made this change.
If that's not the case here, I'll put it back.

[1]
https://www.postgresql.org/message-id/flat/CAHut%2BPt1xwATviPGjjtJy5L631SGf3qjV9XUCmxLu16cHamfgg%40mail.gmail.com

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-20 Thread Melih Mutlu
Hi Peter,

Peter Smith , 20 Tem 2023 Per, 07:10 tarihinde şunu
yazdı:

> Hi, I had a look at the latest 3 patch (v20-0003).
>
> Although this patch was recently modified, the updates are mostly only
> to make it compatible with the updated v20-0002 patch. Specifically,
> the v20-0003 updates did not yet address my review comments from
> v17-0003 [1].
>

Yes, I only addressed your reviews for 0001 and 0002, and rebased 0003 in
latest patches as stated here [1].

I'll update the patch soon according to recent reviews, including yours for
0003.


[1]
https://www.postgresql.org/message-id/CAGPVpCTvALKEXe0%3DN-%2BiMmVxVQ-%2BP8KZ_1qQ1KsSSZ-V9wJ5hw%40mail.gmail.com

Thanks for the reminder.
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-19 Thread Peter Smith
Hi, I had a look at the latest 3 patch (v20-0003).

Although this patch was recently modified, the updates are mostly only
to make it compatible with the updated v20-0002 patch. Specifically,
the v20-0003 updates did not yet address my review comments from
v17-0003 [1].

Anyway, this post is just a reminder so the earlier review doesn't get
forgotten.

--
[1] v17-0003 review -
https://www.postgresql.org/message-id/CAHut%2BPuMAiO_X_Kw6ud-jr5WOm%2Brpkdu7CppDU6mu%3DgY7UVMzQ%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-19 Thread Amit Kapila
On Thu, Jul 20, 2023 at 8:02 AM Peter Smith  wrote:
>
> On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu  wrote:
> >
> > Hi,
> >
> > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's 
> > reviews for 0001 and 0002 with some small comments below.
> >
> > Peter Smith , 10 Tem 2023 Pzt, 10:09 tarihinde şunu 
> > yazdı:
> >>
> >> 6. LogicalRepApplyLoop
> >>
> >> + /*
> >> + * apply_dispatch() may have gone into apply_handle_commit()
> >> + * which can call process_syncing_tables_for_sync.
> >> + *
> >> + * process_syncing_tables_for_sync decides whether the sync of
> >> + * the current table is completed. If it is completed,
> >> + * streaming must be already ended. So, we can break the loop.
> >> + */
> >> + if (MyLogicalRepWorker->is_sync_completed)
> >> + {
> >> + endofstream = true;
> >> + break;
> >> + }
> >> +
> >>
> >> and
> >>
> >> + /*
> >> + * If is_sync_completed is true, this means that the tablesync
> >> + * worker is done with synchronization. Streaming has already been
> >> + * ended by process_syncing_tables_for_sync. We should move to the
> >> + * next table if needed, or exit.
> >> + */
> >> + if (MyLogicalRepWorker->is_sync_completed)
> >> + endofstream = true;
> >>
> >> ~
> >>
> >> Instead of those code fragments above assigning 'endofstream' as a
> >> side-effect, would it be the same (but tidier) to just modify the
> >> other "breaking" condition below:
> >>
> >> BEFORE:
> >> /* Check if we need to exit the streaming loop. */
> >> if (endofstream)
> >> break;
> >>
> >> AFTER:
> >> /* Check if we need to exit the streaming loop. */
> >> if (endofstream || MyLogicalRepWorker->is_sync_completed)
> >> break;
> >
> >
> > First place you mentioned also breaks the infinite loop. Such an if 
> > statement is needed there with or without endofstream assignment.
> >
> > I think if there is a flag to break a loop, using that flag to indicate 
> > that we should exit the loop seems more appropriate to me. I see that it 
> > would be a bit tidier without endofstream = true lines, but I feel like it 
> > would also be less readable.
> >
> > I don't have a strong opinion though. I'm just keeping them as they are for 
> > now, but I can change them if you disagree.
> >
>
> I felt it was slightly sneaky to re-use the existing variable as a
> convenient way to do what you want. But, I don’t feel strongly enough
> on this point to debate it -- maybe see later if others have an
> opinion about this.
>

I feel it is okay to use the existing variable 'endofstream' here but
shall we have an assertion that it is a tablesync worker?

> >>
> >>
> >> 10b.
> >> All the other tablesync-related fields of this struct are named as
> >> relXXX, so I wonder if is better for this to follow the same pattern.
> >> e.g. 'relsync_completed'
> >
> >
> > Aren't those start with rel because they're related to the relation that 
> > the tablesync worker is syncing? is_sync_completed is not a relation 
> > specific field. I'm okay with changing the name but feel like 
> > relsync_completed would be misleading.
>
> My reading of the code is slightly different: Only these fields have
> the prefix ‘rel’ and they are all grouped under the comment “/* Used
> for initial table synchronization. */” because AFAIK only these fields
> are TWS specific (not used for other kinds of workers).
>
> Since this new flag field is also TWS-specific, therefore IMO it
> should follow the same consistent name pattern. But, if you are
> unconvinced, maybe see later if others have an opinion about it.
>

+1 to use the prefix 'rel' here as the sync is specific to the
relation. Even during apply phase, we will apply the relation-specific
changes. See should_apply_changes_for_rel().

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-19 Thread Peter Smith
Some review comments for patch v20-0002

==
src/backend/replication/logical/tablesync.c

1. finish_sync_worker
/*
 * Exit routine for synchronization worker.
 *
 * If reuse_worker is false, the worker will not be reused and exit.
 */

~

IMO the "will not be reused" part doesn't need saying -- it is
self-evident from the fact "reuse_worker is false".

SUGGESTION
If reuse_worker is false, at the conclusion of this function the
worker process will exit.

~~~

2. finish_sync_worker

- StartTransactionCommand();
- ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid;
- CommitTransactionCommand();
-
  /* Find the leader apply worker and signal it. */
  logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);

- /* Stop gracefully */
- proc_exit(0);
+ if (!reuse_worker)
+ {
+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
+ MySubscription->name)));
+ CommitTransactionCommand();
+
+ /* Stop gracefully */
+ proc_exit(0);
+ }

In the HEAD code the log message came *before* it signalled to the
apply leader. Won't it be better to keep the logic in that same order?

~~~

3. process_syncing_tables_for_sync

- finish_sync_worker();
+ /* Sync worker has completed synchronization of the current table. */
+ MyLogicalRepWorker->is_sync_completed = true;
+
+ ereport(LOG,
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\", relation \"%s\" with relid %u has finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();

IIUC it is only the " table synchronization" part that is finished
here; not the whole "table synchronization worker" (compared to
finish_sync_worker function), so maybe the word "worker"  should not
be in this message.

~~~

4. TablesyncWorkerMain

+ if (MyLogicalRepWorker->is_sync_completed)
+ {
+ /* tablesync is done unless a table that needs syncning is found */
+ done = true;

SUGGESTION (Typo "syncning" and minor rewording.)
This tablesync worker is 'done' unless another table that needs
syncing is found.

~

5.
+ /* Found a table for next iteration */
+ finish_sync_worker(true);
+
+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("logical replication worker for subscription \"%s\" will be
reused to sync table \"%s\" with relid %u.",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();
+
+ done = false;
+ break;
+ }
+ LWLockRelease(LogicalRepWorkerLock);

5a.
IMO it seems better to put this ereport *inside* the
finish_sync_worker() function alongside the similar log for when the
worker is not reused.

~

5b.
Isn't there a missing call to that LWLockRelease, if the 'break' happens?

==
src/backend/replication/logical/worker.c

6. LogicalRepApplyLoop

Refer to [1] for my reply to a previous review comment

~~~

7. InitializeLogRepWorker

  if (am_tablesync_worker())
  ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\", table
\"%s\" has started",
+ (errmsg("logical replication worker for subscription \"%s\", table
\"%s\" with relid %u has started",
  MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid;
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));

But this is certainly a tablesync worker so the message here should
say "logical replication table synchronization worker" like the HEAD
code used to do.

It seems this mistake was introduced in patch v20-0001.

==
src/include/replication/worker_internal.h

8.
Refer to [1] for my reply to a previous review comment

--
[1] Replies to previous 0002 comments --
https://www.postgresql.org/message-id/CAHut%2BPtiAtGJC52SGNdobOah5ctYDDhWWKd%3DuP%3DrkRgXzg5rdg%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-19 Thread Peter Smith
On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu  wrote:
>
> Hi,
>
> PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's 
> reviews for 0001 and 0002 with some small comments below.
>
> Peter Smith , 10 Tem 2023 Pzt, 10:09 tarihinde şunu 
> yazdı:
>>
>> 6. LogicalRepApplyLoop
>>
>> + /*
>> + * apply_dispatch() may have gone into apply_handle_commit()
>> + * which can call process_syncing_tables_for_sync.
>> + *
>> + * process_syncing_tables_for_sync decides whether the sync of
>> + * the current table is completed. If it is completed,
>> + * streaming must be already ended. So, we can break the loop.
>> + */
>> + if (MyLogicalRepWorker->is_sync_completed)
>> + {
>> + endofstream = true;
>> + break;
>> + }
>> +
>>
>> and
>>
>> + /*
>> + * If is_sync_completed is true, this means that the tablesync
>> + * worker is done with synchronization. Streaming has already been
>> + * ended by process_syncing_tables_for_sync. We should move to the
>> + * next table if needed, or exit.
>> + */
>> + if (MyLogicalRepWorker->is_sync_completed)
>> + endofstream = true;
>>
>> ~
>>
>> Instead of those code fragments above assigning 'endofstream' as a
>> side-effect, would it be the same (but tidier) to just modify the
>> other "breaking" condition below:
>>
>> BEFORE:
>> /* Check if we need to exit the streaming loop. */
>> if (endofstream)
>> break;
>>
>> AFTER:
>> /* Check if we need to exit the streaming loop. */
>> if (endofstream || MyLogicalRepWorker->is_sync_completed)
>> break;
>
>
> First place you mentioned also breaks the infinite loop. Such an if statement 
> is needed there with or without endofstream assignment.
>
> I think if there is a flag to break a loop, using that flag to indicate that 
> we should exit the loop seems more appropriate to me. I see that it would be 
> a bit tidier without endofstream = true lines, but I feel like it would also 
> be less readable.
>
> I don't have a strong opinion though. I'm just keeping them as they are for 
> now, but I can change them if you disagree.
>

I felt it was slightly sneaky to re-use the existing variable as a
convenient way to do what you want. But, I don’t feel strongly enough
on this point to debate it -- maybe see later if others have an
opinion about this.

>>
>>
>> 10b.
>> All the other tablesync-related fields of this struct are named as
>> relXXX, so I wonder if is better for this to follow the same pattern.
>> e.g. 'relsync_completed'
>
>
> Aren't those start with rel because they're related to the relation that the 
> tablesync worker is syncing? is_sync_completed is not a relation specific 
> field. I'm okay with changing the name but feel like relsync_completed would 
> be misleading.

My reading of the code is slightly different: Only these fields have
the prefix ‘rel’ and they are all grouped under the comment “/* Used
for initial table synchronization. */” because AFAIK only these fields
are TWS specific (not used for other kinds of workers).

Since this new flag field is also TWS-specific, therefore IMO it
should follow the same consistent name pattern. But, if you are
unconvinced, maybe see later if others have an opinion about it.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-18 Thread Amit Kapila
On Wed, Jul 19, 2023 at 8:38 AM Peter Smith  wrote:
>
> Some review comments for v19-0001
>
...
> ==
> src/backend/replication/logical/worker.c
>
> 3. set_stream_options
>
> +/*
> + * Sets streaming options including replication slot name and origin start
> + * position. Workers need these options for logical replication.
> + */
> +void
> +set_stream_options(WalRcvStreamOptions *options,
>
> I'm not sure if the last sentence of the comment is adding anything useful.
>

Personally, I find it useful as at a high-level it tells the purpose
of setting these options.

> ~~~
>
> 4. start_apply
> /*
>  * Run the apply loop with error handling. Disable the subscription,
>  * if necessary.
>  *
>  * Note that we don't handle FATAL errors which are probably because
>  * of system resource error and are not repeatable.
>  */
> void
> start_apply(XLogRecPtr origin_startpos)
>
> ~
>
> 4a.
> Somehow I found the function names to be confusing. Intuitively (IMO)
> 'start_apply' is for apply worker and 'start_tablesync' is for
> tablesync worker. But actually, the start_apply() function is the
> *common* function for both kinds of worker. Might be easier to
> understand if start_apply function name can be changed to indicate it
> is really common -- e.g. common_apply_loop(), or similar.
>
> ~
>
> 4b.
> If adverse to changing the function name, it might be helpful anyway
> if the function comment can emphasize this function is shared by
> different worker types. e.g. "Common function to run the apply
> loop..."
>

I would prefer to change the comments as suggested by you in 4b
because both the workers (apply and tablesync) need to perform apply,
so it seems logical for both of them to invoke start_apply.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-18 Thread Peter Smith
Some review comments for v19-0001

==
src/backend/replication/logical/tablesync.c

1. run_tablesync_worker
+run_tablesync_worker(WalRcvStreamOptions *options,
+ char *slotname,
+ char *originname,
+ int originname_size,
+ XLogRecPtr *origin_startpos)
+{
+ /* Start table synchronization. */
+ start_table_sync(origin_startpos, );

There was no such comment ("/* Start table synchronization. */") in
the original HEAD code, so I didn't see that it adds much value by
adding it in the refactored code.

~~~

2. LogicalRepSyncTableStart

/*
* Finally, wait until the leader apply worker tells us to catch up and
* then return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);

~

Should LogicalRepApplyLoop still be mentioned here, since that is
static in worker.c? Maybe it is better to refer instead to the common
'start_apply' wrapper? (see also #5a below)

==
src/backend/replication/logical/worker.c

3. set_stream_options

+/*
+ * Sets streaming options including replication slot name and origin start
+ * position. Workers need these options for logical replication.
+ */
+void
+set_stream_options(WalRcvStreamOptions *options,

I'm not sure if the last sentence of the comment is adding anything useful.

~~~

4. start_apply
/*
 * Run the apply loop with error handling. Disable the subscription,
 * if necessary.
 *
 * Note that we don't handle FATAL errors which are probably because
 * of system resource error and are not repeatable.
 */
void
start_apply(XLogRecPtr origin_startpos)

~

4a.
Somehow I found the function names to be confusing. Intuitively (IMO)
'start_apply' is for apply worker and 'start_tablesync' is for
tablesync worker. But actually, the start_apply() function is the
*common* function for both kinds of worker. Might be easier to
understand if start_apply function name can be changed to indicate it
is really common -- e.g. common_apply_loop(), or similar.

~

4b.
If adverse to changing the function name, it might be helpful anyway
if the function comment can emphasize this function is shared by
different worker types. e.g. "Common function to run the apply
loop..."

~~~

5. run_apply_worker

+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+originname, originname_size);
+
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();

Even if you wish ReplicationOriginNameForLogicalRep() to be outside of
the transaction I thought it should still come *after* the comment,
same as it does in the HEAD code.

~~~

6. ApplyWorkerMain

- /* Run the main loop. */
- start_apply(origin_startpos);
+ /* This is leader apply worker */
+ run_apply_worker(, myslotname, originname,
sizeof(originname), _startpos);

  proc_exit(0);
 }

~

6a.
The comment "/* This is leader apply worker */" is redundant now. This
function is the entry point for leader apply workers so it can't be
anything else.

~

6b.

Caller parameter wrapping differs from the similar code in
TablesyncWorkerMain. Shouldn't they be similar?

e.g.
+ run_apply_worker(, myslotname, originname,
sizeof(originname), _startpos);

versus
+ run_tablesync_worker(,
+ myslotname,
+ originname,
+ sizeof(originname),
+ _startpos);

==
src/include/replication/worker_internal.h

7.
+
+extern void set_stream_options(WalRcvStreamOptions *options,
+char *slotname,
+XLogRecPtr *origin_startpos);
+extern void start_apply(XLogRecPtr origin_startpos);
+extern void DisableSubscriptionAndExit(void);
+

Maybe all the externs belong together? It doesn't seem right for just
these 3 externs to be separated from all the others, with those static
inline functions in-between.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-18 Thread vignesh C
On Tue, 11 Jul 2023 at 08:30, Peter Smith  wrote:
>
> On Tue, Jul 11, 2023 at 12:31 AM Melih Mutlu  wrote:
> >
> > Hi,
> >
> > Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per,
> > 12:47 tarihinde şunu yazdı:
> > >
> > > Dear Melih,
> > >
> > > > Thanks for the 0003 patch. But it did not work for me. Can you create
> > > > a subscription successfully with patch 0003 applied?
> > > > I get the following error: " ERROR:  table copy could not start
> > > > transaction on publisher: another command is already in progress".
> > >
> > > You got the ERROR when all the patches (0001-0005) were applied, right?
> > > I have focused on 0001 and 0002 only, so I missed something.
> > > If it was not correct, please attach the logfile and test script what you 
> > > did.
> >
> > Yes, I did get an error with all patches applied. But with only 0001
> > and 0002, your version seems like working and mine does not.
> > What do you think about combining 0002 and 0003? Or should those stay 
> > separate?
> >
>
> Even if patches 0003 and 0002 are to be combined, I think that should
> not happen until after the "reuse" design is confirmed which way is
> best.
>
> e.g. IMO it might be easier to compare the different PoC designs for
> patch 0002 if there is no extra logic involved.
>
> PoC design#1 -- each tablesync decides for itself what to do next
> after it finishes
> PoC design#2 -- reuse tablesync using a "pool" of available workers

I did a POC for design#2 for implementing a worker pool to synchronize
the tables for a subscriber. The core design is the same as what Melih
had implemented at [1]. I had already started the implementation of
POC based on one of the earlier e-mail [2] Peter had shared.
The POC has been implemented like:
a) Apply worker will check the tablesync pool and see if any tablesync
worker is free:
 i) If there are no free workers in the pool, start a table sync
worker and add it to the table sync pool.
 ii) If there are free workers in the pool, re-use the tablesync
worker for synchronizing another table.
b) Apply worker will check if the tables are synchronized, if all the
tables are synchronized apply worker will release all the workers from
the tablesync pool
c) Apply worker and tablesync worker has shared memory to share the
following relation data and execution state between the apply worker
and the tablesync worker
d) The apply worker and tablesync worker's pid are also stored in the
shared memory so that we need not take a lock on LogicalRepWorkerLock
and loop on max_logical_replication_workers every time. We use the pid
stored in shared memory to wake up the apply worker and tablesync
worker whenever needed.

While I was implementing the POC I found one issue in the POC
patch(there is no problem with the HEAD code, issue was only with the
POC):
1) Apply worker was waiting for the table to be set to SYNCDONE.
2) Mean time tablesync worker sets the table to SYNCDONE and sets
apply worker's latch.
3) Apply worker will reset the latch set by tablesync and go to main
loop and wait in main loop latch(since tablesync worker's latch was
already reset, apply worker will wait for 1 second)
To fix this I had to set apply worker's latch once in 1ms in this case
alone which is not a good solution as it will consume a lot of cpu
cycles. A better fix for this would be to introduce a new subscription
relation state.

Attached patch has the changes for the same. 001, 0002 and 0003 are
the patches shared by Melih and Kuroda-san earlier. 0004 patch has the
changes for the POC of Tablesync worker pool implementation.
POC design 1: Tablesync worker identifies the tables that should be
synced and reuses the connection.
POC design 2: Tablesync worker pool with apply worker scheduling the
work to tablesync workers in the tablesync pool and reusing the
connection.
Performance results for 10 empty tables:
+---+++--++
|| 2 sync workers | 4 sync workers | 8 sync
workers  | 16 sync workers|
+---+++--++
| HEAD  | 128.4685 ms| 121.271 ms | 136.5455 ms
 |   N/A  |
+---+++--++
| POC design#1|  70.7095 ms|  80.9805 ms| 102.773  ms   |
 N/A  |
+---+++--++
| POC design#2|  70.858 ms  |  83.0845 ms| 112.505 ms|
  N/A  |
+---+++--++

Performance results for 100 empty tables:
+---+++--++
| | 2 sync workers | 4 sync workers | 8 sync
workers | 16 sync workers|

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-18 Thread Amit Kapila
On Tue, Jul 18, 2023 at 2:33 PM Melih Mutlu  wrote:
>
> Attached the fixed patchset.
>

Few comments on 0001

1.
+ logicalrep_worker_attach(worker_slot);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ /*
+ * We don't currently need any ResourceOwner in a walreceiver process, but
+ * if we did, we could call CreateAuxProcessResourceOwner here.
+ */
+
+ /* Initialise stats to a sanish value */
+ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+ MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ InitializeLogRepWorker();
+
+ /* Connect to the origin and start the replication. */
+ elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
+ MySubscription->conninfo);
+
+ /*
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
+ */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+   invalidate_syncing_table_states,
+   (Datum) 0);

It seems this part of the code is the same for ApplyWorkerMain() and
TablesyncWorkerMain(). So, won't it be better to move it into a common
function?

2. Can LogicalRepSyncTableStart() be static function?

3. I think you don't need to send 0004, 0005 each time till we are
able to finish patches till 0003.

4. In 0001's commit message, you can say that it will help the
upcoming reuse tablesync worker patch.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-17 Thread Peter Smith
On Tue, Jul 18, 2023 at 11:25 AM Peter Smith  wrote:
>
> On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu  wrote:
> >
> > Hi,
> >
> > PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's 
> > reviews for 0001 and 0002 with some small comments below.
> >
>
> Thanks, I will take another look at these soon. FYI, the 0001 patch
> does not apply cleanly. It needs to be rebased again because
> get_worker_name() function was recently removed from HEAD.
>

Sorry, to be more correct -- it applied OK, but failed to build.

> replication/logical/worker.o: In function `InitializeLogRepWorker':
> /home/postgres/oss_postgres_misc/src/backend/replication/logical/worker.c:4605:
> undefined reference to `get_worker_name'
>
> --
> Kind Regards,
> Peter Smith.
> Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-17 Thread Peter Smith
On Tue, Jul 18, 2023 at 1:54 AM Melih Mutlu  wrote:
>
> Hi,
>
> PFA updated patches. Rebased 0003 with minor changes. Addressed Peter's 
> reviews for 0001 and 0002 with some small comments below.
>

Thanks, I will take another look at these soon. FYI, the 0001 patch
does not apply cleanly. It needs to be rebased again because
get_worker_name() function was recently removed from HEAD.

replication/logical/worker.o: In function `InitializeLogRepWorker':
/home/postgres/oss_postgres_misc/src/backend/replication/logical/worker.c:4605:
undefined reference to `get_worker_name'

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-15 Thread Amit Kapila
On Fri, Jul 14, 2023 at 3:07 PM Melih Mutlu  wrote:
>
> Amit Kapila , 14 Tem 2023 Cum, 11:11 tarihinde şunu 
> yazdı:
>>
>> Yeah, it is quite surprising that Design#2 is worse than master. I
>> suspect there is something wrong going on with your Design#2 patch.
>> One area to check is whether apply worker is able to quickly assign
>> the new relations to tablesync workers. Note that currently after the
>> first time assigning the tables to workers, the apply worker may wait
>> before processing the next set of tables in the main loop of
>> LogicalRepApplyLoop(). The other minor point about design#2
>> implementation is that you may want to first assign the allocated
>> tablesync workers before trying to launch a new worker.
>
>
> It's not actually worse than master all the time. It seems like it's just 
> unreliable.
> Here are some consecutive runs for both designs and master.
>
> design#1 = 1621,527 ms, 1788,533 ms, 1645,618 ms, 1702,068 ms, 1745,753 ms
> design#2 = 2089,077 ms, 1864,571 ms, 4574,799 ms, 5422,217 ms, 1905,944 ms
> master = 2815,138 ms, 2481,954 ms , 2594,413 ms, 2620,690 ms, 2489,323 ms
>
> And apply worker was not busy with applying anything during these experiments 
> since there were not any writes to the publisher. I'm not sure how that would 
> also affect the performance if there were any writes.
>

Yeah, this is a valid point. I think this is in favor of the Design#1
approach we are discussing here. One thing I was thinking whether we
can do anything to alleviate the contention at the higher worker
count. One possibility is to have some kind of available worker list
which can be used to pick up the next worker instead of checking all
the workers while assigning the next table. We can probably explore it
separately once the first three patches are ready because anyway, this
will be an optimization atop the Design#1 approach.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-14 Thread Melih Mutlu
Hi,

Amit Kapila , 14 Tem 2023 Cum, 11:11 tarihinde
şunu yazdı:

> Yeah, it is quite surprising that Design#2 is worse than master. I
> suspect there is something wrong going on with your Design#2 patch.
> One area to check is whether apply worker is able to quickly assign
> the new relations to tablesync workers. Note that currently after the
> first time assigning the tables to workers, the apply worker may wait
> before processing the next set of tables in the main loop of
> LogicalRepApplyLoop(). The other minor point about design#2
> implementation is that you may want to first assign the allocated
> tablesync workers before trying to launch a new worker.
>

It's not actually worse than master all the time. It seems like it's just
unreliable.
Here are some consecutive runs for both designs and master.

design#1 = 1621,527 ms, 1788,533 ms, 1645,618 ms, 1702,068 ms, 1745,753 ms
design#2 = 2089,077 ms, 1864,571 ms, 4574,799 ms, 5422,217 ms, 1905,944 ms
master = 2815,138 ms, 2481,954 ms , 2594,413 ms, 2620,690 ms, 2489,323 ms

And apply worker was not busy with applying anything during these
experiments since there were not any writes to the publisher. I'm not sure
how that would also affect the performance if there were any writes.

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-14 Thread Peter Smith
Hi Kuroda-san.

Here are some review comments for the v17-0003 patch. They are all minor.

==
Commit message

1.
Previously tablesync workers establish new connections when it changes
the syncing
table, but this might have additional overhead. This patch allows to
reuse connections
instead.

~

/This patch allows to reuse connections instead./This patch allows the
existing connection to be reused./

~~~

2.
As for the publisher node, this patch allows to reuse logical
walsender processes
after the streaming is done once.

~

Is this paragraph even needed? Since the connection is reused then it
already implies the other end (the Wlasender) is being reused, right?

==
src/backend/replication/logical/tablesync.c

3.
+ * FIXME: set appropriate application_name. Previously, the slot name was used
+ * because the lifetime of the tablesync worker was same as that, but now the
+ * tablesync worker handles many slots during the synchronization so that it is
+ * not suitable. So what should be? Note that if the tablesync worker starts to
+ * reuse the replication slot during synchronization, we should use the slot
+ * name as application_name again.
+ */
+static void
+ApplicationNameForTablesync(Oid suboid, int worker_slot,
+ char *application_name, Size szapp)

3a.
I felt that most of this FIXME comment belongs with the calling code,
not here.

3b.
Also, maybe it needs some rewording -- I didn't understand exactly
what it is trying to say.


~~~

4.
- /*
- * Here we use the slot name instead of the subscription name as the
- * application_name, so that it is different from the leader apply worker,
- * so that synchronous replication can distinguish them.
- */
- LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true,
-must_use_password,
-slotname, );
+ /* Connect to the publisher if haven't done so already. */
+ if (LogRepWorkerWalRcvConn == NULL)
+ {
+ char application_name[NAMEDATALEN];
+
+ /*
+ * The application_name must be also different from the leader apply
+ * worker because synchronous replication must distinguish them.
+ */
+ ApplicationNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->worker_slot,
+ application_name,
+ NAMEDATALEN);
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true,
+must_use_password,
+application_name, );
+ }
+

Should the comment mention the "subscription name" as it did before?

SUGGESTION
The application_name must differ from the subscription name (used by
the leader apply worker) because synchronous replication has to be
able to distinguish this worker from the leader apply worker.

==
src/backend/replication/logical/worker.c

5.
-start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+start_table_sync(XLogRecPtr *origin_startpos,
+ char **myslotname)

This is a wrapping change only. It looks like an unnecessary hangover
from a previous version of 0003.

==
src/backend/replication/walsender.c

6. exec_replication_command

+
  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
  StartReplication(cmd);
~

The extra blank line does not belong in this patch.

==
src/include/replication/worker_internal.h

+ /* Indicates the slot number which corresponds to this LogicalRepWorker. */
+ int worker_slot;
+

6a
I think this field is very fundamental, so IMO it should be defined at
the top of the struct, maybe nearby the other 'in_use' and
'generation' fields.

~

6b.
Also, since this is already a "worker" struct so there is no need to
have "worker" in the field name again -- just "slot_number" or
"slotnum" might be a better name.

And then the comment can also be simplified.

SUGGESTION
/* Slot number of this worker. */
int slotnum;

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-14 Thread Amit Kapila
On Fri, Jul 14, 2023 at 1:58 AM Melih Mutlu  wrote:
>
> Here are some quick numbers with 100 empty tables.
>
> +--++++
> |  | 2 sync workers | 4 sync workers | 8 sync workers |
> +--++++
> | POC design#1 | 1909.873 ms| 986.261 ms | 552.404 ms |
> +--++++
> | POC design#2 | 4962.208 ms| 1240.503 ms| 1165.405 ms|
> +--++++
> | master   | 2666.008 ms| 1462.012 ms| 986.848 ms |
> +--++++
>
> Seems like design#1 is better than both design#2 and master overall. It's 
> surprising to see that even master beats design#2 in some cases though. Not 
> sure if that is expected or there are some places to improve design#2 even 
> more.
>

Yeah, it is quite surprising that Design#2 is worse than master. I
suspect there is something wrong going on with your Design#2 patch.
One area to check is whether apply worker is able to quickly assign
the new relations to tablesync workers. Note that currently after the
first time assigning the tables to workers, the apply worker may wait
before processing the next set of tables in the main loop of
LogicalRepApplyLoop(). The other minor point about design#2
implementation is that you may want to first assign the allocated
tablesync workers before trying to launch a new worker.

>
> PS: I only attached the related patches and not the whole patch set. 0001 and 
> 0002 may contain some of your earlier reviews, but I'll send a proper updated 
> set soon.
>

Yeah, that would be helpful.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-13 Thread Melih Mutlu
Hi Peter,

Peter Smith , 11 Tem 2023 Sal, 05:59 tarihinde şunu
yazdı:
> Even if patches 0003 and 0002 are to be combined, I think that should
> not happen until after the "reuse" design is confirmed which way is
> best.
>
> e.g. IMO it might be easier to compare the different PoC designs for
> patch 0002 if there is no extra logic involved.
>
> PoC design#1 -- each tablesync decides for itself what to do next
> after it finishes
> PoC design#2 -- reuse tablesync using a "pool" of available workers

Right. I made a patch 0003 to change 0002 so that tables will be assigned
to sync workers by apply worker.
It's a rough POC and ignores some edge cases. But this is what I think how
apply worker would take the responsibility of table assignments. Hope the
implementation makes sense and I'm not missing anything that may cause
degraded perforrmance.

PoC design#1 --> apply only patch 0001 and 0002
PoC design#2 --> apply all patches, 0001, 0002 and 0003

Here are some quick numbers with 100 empty tables.

+--++++
|  | 2 sync workers | 4 sync workers | 8 sync workers |
+--++++
| POC design#1 | 1909.873 ms| 986.261 ms | 552.404 ms |
+--++++
| POC design#2 | 4962.208 ms| 1240.503 ms| 1165.405 ms|
+--++++
| master   | 2666.008 ms| 1462.012 ms| 986.848 ms |
+--++++

Seems like design#1 is better than both design#2 and master overall. It's
surprising to see that even master beats design#2 in some cases though. Not
sure if that is expected or there are some places to improve design#2 even
more.

What do you think?

PS: I only attached the related patches and not the whole patch set. 0001
and 0002 may contain some of your earlier reviews, but I'll send a proper
updated set soon.

Thanks,
-- 
Melih Mutlu
Microsoft


v18-0002-Reuse-Tablesync-Workers.patch
Description: Binary data


v18-0001-Refactor-to-split-Apply-and-Tablesync-Workers.patch
Description: Binary data


v18-0003-apply-worker-assigns-tables.patch
Description: Binary data


RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-12 Thread Hayato Kuroda (Fujitsu)
Dear Melih,

> > > Thanks for the 0003 patch. But it did not work for me. Can you create
> > > a subscription successfully with patch 0003 applied?
> > > I get the following error: " ERROR:  table copy could not start
> > > transaction on publisher: another command is already in progress".
> >
> > You got the ERROR when all the patches (0001-0005) were applied, right?
> > I have focused on 0001 and 0002 only, so I missed something.
> > If it was not correct, please attach the logfile and test script what you 
> > did.
> 
> Yes, I did get an error with all patches applied. But with only 0001
> and 0002, your version seems like working and mine does not.

Hmm, really? IIUC I did not modify 0001 and 0002 patches, I just re-assigned the
version number. I compared between yours and mine, but no meaningful differences
were found. E.g., following command compared v4-0002 and v16-0002:

```
diff --git a/../reuse_workers/v4-0002-Reuse-Tablesync-Workers.patch 
b/../reuse_workers/hayato/v16-0002-Reuse-Tablesync-Workers.patch
index 5350216e98..7785a573e4 100644
--- a/../reuse_workers/v4-0002-Reuse-Tablesync-Workers.patch
+++ b/../reuse_workers/hayato/v16-0002-Reuse-Tablesync-Workers.patch
@@ -1,7 +1,7 @@
-From d482022b40e0a5ce1b74fd0e320cb5b45da2f671 Mon Sep 17 00:00:00 2001
+From db3e8e2d7aadea79126c5816bce8b06dc82f33c2 Mon Sep 17 00:00:00 2001
 From: Melih Mutlu 
 Date: Tue, 4 Jul 2023 22:04:46 +0300
-Subject: [PATCH 2/5] Reuse Tablesync Workers
+Subject: [PATCH v16 2/5] Reuse Tablesync Workers
 
 This commit allows reusing tablesync workers for syncing more than one
 table sequentially during their lifetime, instead of exiting after
@@ -324,5 +324,5 @@ index 7aba034774..1e9f8e6e72 100644
  static inline bool
  am_tablesync_worker(void)
 -- 
-2.25.1
+2.27.0
```

For confirmation, please attach the logfile and test script what you did
if you could reproduce?

> What do you think about combining 0002 and 0003? Or should those stay
> separate?

I have no strong opinion, but it may be useful to keep them pluggable.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Amit Kapila
On Mon, Jul 10, 2023 at 8:01 PM Melih Mutlu  wrote:
>
> Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per,
> 12:47 tarihinde şunu yazdı:
> >
> > Dear Melih,
> >
> > > Thanks for the 0003 patch. But it did not work for me. Can you create
> > > a subscription successfully with patch 0003 applied?
> > > I get the following error: " ERROR:  table copy could not start
> > > transaction on publisher: another command is already in progress".
> >
> > You got the ERROR when all the patches (0001-0005) were applied, right?
> > I have focused on 0001 and 0002 only, so I missed something.
> > If it was not correct, please attach the logfile and test script what you 
> > did.
>
> Yes, I did get an error with all patches applied. But with only 0001
> and 0002, your version seems like working and mine does not.
> What do you think about combining 0002 and 0003? Or should those stay 
> separate?
>

I am fine either way but I think one minor advantage of keeping 0003
separate is that we can focus on some of the problems specific to that
patch. For example, the following comment in the 0003 patch: "FIXME:
set appropriate application_name...". I have given a suggestion to
address it in [1] and Kuroda-San seems to have addressed the same but
I am not sure if all of us agree with that or if there is any better
way to address it. What do you think?

>
> > * 0003 basically improved performance from first two patches
>
> Agree, 0003 is definitely a good addition which was missing earlier.
>

+1.

[1] - 
https://www.postgresql.org/message-id/CAA4eK1JOZHmy2o2F2wTCPKsjpwDiKZPOeTa_jt%3Dwm2JLbf-jsg%40mail.gmail.com

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Peter Smith
On Tue, Jul 11, 2023 at 12:31 AM Melih Mutlu  wrote:
>
> Hi,
>
> Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per,
> 12:47 tarihinde şunu yazdı:
> >
> > Dear Melih,
> >
> > > Thanks for the 0003 patch. But it did not work for me. Can you create
> > > a subscription successfully with patch 0003 applied?
> > > I get the following error: " ERROR:  table copy could not start
> > > transaction on publisher: another command is already in progress".
> >
> > You got the ERROR when all the patches (0001-0005) were applied, right?
> > I have focused on 0001 and 0002 only, so I missed something.
> > If it was not correct, please attach the logfile and test script what you 
> > did.
>
> Yes, I did get an error with all patches applied. But with only 0001
> and 0002, your version seems like working and mine does not.
> What do you think about combining 0002 and 0003? Or should those stay 
> separate?
>

Even if patches 0003 and 0002 are to be combined, I think that should
not happen until after the "reuse" design is confirmed which way is
best.

e.g. IMO it might be easier to compare the different PoC designs for
patch 0002 if there is no extra logic involved.

PoC design#1 -- each tablesync decides for itself what to do next
after it finishes
PoC design#2 -- reuse tablesync using a "pool" of available workers

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Peter Smith
Here are some review comments for patch v16-3

==
1. Commit Message.

The patch description is missing.

==
2. General.

+LogicalRepSyncTableStart(XLogRecPtr *origin_startpos, int worker_slot)

and

+start_table_sync(XLogRecPtr *origin_startpos,
+ char **myslotname,
+ int worker_slot)

and

@@ -4548,12 +4552,13 @@ run_tablesync_worker(WalRcvStreamOptions *options,
  char *slotname,
  char *originname,
  int originname_size,
- XLogRecPtr *origin_startpos)
+ XLogRecPtr *origin_startpos,
+ int worker_slot)


It seems the worker_slot is being passed all over the place as an
additional function argument so that it can be used to construct an
application_name. Is it possible/better to introduce a new
'MyLogicalRepWorker' field for the 'worker_slot' so it does not have
to be passed like this?

==
src/backend/replication/logical/tablesync.c

3.
+ /*
+ * Disconnect from publisher. Otherwise reused sync workers causes
+ * exceeding max_wal_senders.
+ */
+ if (LogRepWorkerWalRcvConn != NULL)
+ {
+ walrcv_disconnect(LogRepWorkerWalRcvConn);
+ LogRepWorkerWalRcvConn = NULL;
+ }
+

Why is this comment mentioning anything about "reused workers" at all?
The worker process exits in this function, right?

~~~

4. LogicalRepSyncTableStart

  /*
- * Here we use the slot name instead of the subscription name as the
- * application_name, so that it is different from the leader apply worker,
- * so that synchronous replication can distinguish them.
+ * Connect to publisher if not yet. The application_name must be also
+ * different from the leader apply worker because synchronous replication
+ * must distinguish them.
  */

I felt all the details in the 2nd part of this comment belong inside
the condition, not outside.

SUGGESTION
/* Connect to the publisher if haven't done so already. */

~~~

5.
+ if (LogRepWorkerWalRcvConn == NULL)
+ {
+ char application_name[NAMEDATALEN];
+
+ /*
+ * FIXME: set appropriate application_name. Previously, the slot name
+ * was used becasue the lifetime of the tablesync worker was same as
+ * that, but now the tablesync worker handles many slots during the
+ * synchronization so that it is not suitable. So what should be?
+ * Note that if the tablesync worker starts to reuse the replication
+ * slot during synchronization, we should use the slot name as
+ * application_name again.
+ */
+ snprintf(application_name, NAMEDATALEN, "pg_%u_sync_%i",
+ MySubscription->oid, worker_slot);
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true,
+must_use_password,
+application_name, );
+ }

5a.
/becasue/because/

~

5b.
I am not sure about what name this should ideally use, but anyway for
uniqueness doesn't it still need to include the GetSystemIdentifier()
same as function ReplicationSlotNameForTablesync() was doing?

Maybe this can use the same function ReplicationSlotNameForTablesync()
can be used but just pass the worker_slot instead of the relid?

==
src/backend/replication/logical/worker.c

6. LogicalRepApplyLoop

  /*
  * Init the ApplyMessageContext which we clean up after each replication
- * protocol message.
+ * protocol message, if needed.
  */
- ApplyMessageContext = AllocSetContextCreate(ApplyContext,
- "ApplyMessageContext",
- ALLOCSET_DEFAULT_SIZES);
+ if (!ApplyMessageContext)
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+

Maybe slightly reword the comment.

BEFORE:
Init the ApplyMessageContext which we clean up after each replication
protocol message, if needed.

AFTER:
Init the ApplyMessageContext if needed. This context is cleaned up
after each replication protocol message.

==
src/backend/replication/walsender.c

7.
+ /*
+ * Initialize the flag again because this streaming may be
+ * second time.
+ */
+ streamingDoneSending = streamingDoneReceiving = false;

Isn't this only possible to be 2nd time because the "reuse tablesync
worker" might re-issue a START_REPLICATION again to the same
WALSender? So, should this flag reset ONLY be done for the logical
replication ('else' part), otherwise it should be asserted false?

e.g. Would it be better to be like this?

if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
Assert(!streamingDoneSending && !streamingDoneReceiving)
StartReplication(cmd);
}
else
{
/* Reset flags because reusing tablesync workers can mean this is the
second time here. */
streamingDoneSending = streamingDoneReceiving = false;
StartLogicalReplication(cmd);
}

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Melih Mutlu
Hi,

Hayato Kuroda (Fujitsu) , 6 Tem 2023 Per,
12:47 tarihinde şunu yazdı:
>
> Dear Melih,
>
> > Thanks for the 0003 patch. But it did not work for me. Can you create
> > a subscription successfully with patch 0003 applied?
> > I get the following error: " ERROR:  table copy could not start
> > transaction on publisher: another command is already in progress".
>
> You got the ERROR when all the patches (0001-0005) were applied, right?
> I have focused on 0001 and 0002 only, so I missed something.
> If it was not correct, please attach the logfile and test script what you did.

Yes, I did get an error with all patches applied. But with only 0001
and 0002, your version seems like working and mine does not.
What do you think about combining 0002 and 0003? Or should those stay separate?

> Hi, I did a performance testing for v16 patch set.
> Results show that patches significantly improves the performance in most 
> cases.
>
> # Method
>
> Following tests were done 10 times per condition, and compared by median.
> do_one_test.sh was used for the testing.
>
> 1.  Create tables on publisher
> 2.  Insert initial data on publisher
> 3.  Create tables on subscriber
> 4.  Create a replication slot (mysub_slot) on publisher
> 5.  Create a publication on publisher
> 6.  Create tables on subscriber
> --- timer on ---
> 7.  Create subscription with pre-existing replication slot (mysub_slot)
> 8.  Wait until all srsubstate in pg_subscription_rel becomes 'r'
> --- timer off ---
>

Thanks for taking the time to do testing and sharing the results. This
is also how I've been doing the testing since, but the process was
half scripted, half manual work.

> According to the measurement, we can say following things:
>
> * In any cases the performance was improved from the HEAD.
> * The improvement became more significantly if number of synced tables were 
> increased.

Yes, I believe it becomes more significant when workers spend less
time with actually copying data but more with other stuff like
launching workers, opening connections etc.

> * 0003 basically improved performance from first two patches

Agree, 0003 is definitely a good addition which was missing earlier.


Thanks,
-- 
Melih Mutlu
Microsoft




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Melih Mutlu
Hi,

Amit Kapila , 6 Tem 2023 Per, 06:56 tarihinde
şunu yazdı:
>
> On Wed, Jul 5, 2023 at 1:48 AM Melih Mutlu  wrote:
> >
> > Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal,
> > 08:42 tarihinde şunu yazdı:
> > > > > But in the later patch the tablesync worker tries to reuse the slot 
> > > > > during the
> > > > > synchronization, so in this case the application_name should be same 
> > > > > as
> > > > slotname.
> > > > >
> > > >
> > > > Fair enough. I am slightly afraid that if we can't show the benefits
> > > > with later patches then we may need to drop them but at this stage I
> > > > feel we need to investigate why those are not helping?
> > >
> > > Agreed. Now I'm planning to do performance testing independently. We can 
> > > discuss
> > > based on that or Melih's one.
> >
> > Here I attached  what I use for performance testing of this patch.
> >
> > I only benchmarked the patch set with reusing connections very roughly
> > so far. But seems like it improves quite significantly. For example,
> > it took 611 ms to sync 100 empty tables, it was 1782 ms without
> > reusing connections.
> > First 3 patches from the set actually bring a good amount of
> > improvement, but not sure about the later patches yet.
> >
>
> I suggest then we should focus first on those 3, get them committed
> and then look at the remaining.
>

That sounds good. I'll do my best to address any review/concern from
reviewers now for the first 3 patches and hopefully those can get
committed first. I'll continue working on the remaining patches later.

-- 
Melih Mutlu
Microsoft




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-10 Thread Peter Smith
Hi, here are some review comments for patch v16-0002.

==
Commit message

1.
This commit allows reusing tablesync workers for syncing more than one
table sequentially during their lifetime, instead of exiting after
only syncing one table.

Before this commit, tablesync workers were capable of syncing only one
table. For each table, a new sync worker was launched and that worker would
exit when done processing the table.

Now, tablesync workers are not limited to processing only one
table. When done, they can move to processing another table in
the same subscription.

~

IMO that first paragraph can be removed because AFAIK the other
paragraphs are saying exactly the same thing but worded differently.

==
src/backend/replication/logical/tablesync.c

2. General -- for clean_sync_worker and finish_sync_worker

TBH, I found the separation of clean_sync_worker() and
finish_sync_worker() to be confusing. Can't it be rearranged to keep
the same function but just pass a boolean to tell it to exit or not
exit?

e.g.

finish_sync_worker(bool reuse_worker) { ... }

~~~

3. clean_sync_worker

  /*
- * Commit any outstanding transaction. This is the usual case, unless
- * there was nothing to do for the table.
+ * Commit any outstanding transaction. This is the usual case, unless there
+ * was nothing to do for the table.
  */

The word wrap seems OK, except the change seemed unrelated to this patch (??)

~~~

4.
+ /*
+ * Disconnect from publisher. Otherwise reused sync workers causes
+ * exceeding max_wal_senders
+ */

Missing period, and not an English sentence.

SUGGESTION (??)
Disconnect from the publisher otherwise reusing the sync worker can
error due to exceeding max_wal_senders.

~~~

5. finish_sync_worker

+/*
+ * Exit routine for synchronization worker.
+ */
+void
+pg_attribute_noreturn()
+finish_sync_worker(void)
+{
+ clean_sync_worker();
+
  /* And flush all writes. */
  XLogFlush(GetXLogWriteRecPtr());

  StartTransactionCommand();
  ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid;
+ (errmsg("logical replication table synchronization worker for
subscription \"%s\" has finished",
+ MySubscription->name)));
  CommitTransactionCommand();

In the original code, the XLogFlush was in a slightly different order
than in this refactored code. E.g. it came before signalling the apply
worker. Is it OK to be changed?

Keeping one function (suggested in #2) can maybe remove this potential issue.

==
src/backend/replication/logical/worker.c

6. LogicalRepApplyLoop

+ /*
+ * apply_dispatch() may have gone into apply_handle_commit()
+ * which can call process_syncing_tables_for_sync.
+ *
+ * process_syncing_tables_for_sync decides whether the sync of
+ * the current table is completed. If it is completed,
+ * streaming must be already ended. So, we can break the loop.
+ */
+ if (MyLogicalRepWorker->is_sync_completed)
+ {
+ endofstream = true;
+ break;
+ }
+

and

+ /*
+ * If is_sync_completed is true, this means that the tablesync
+ * worker is done with synchronization. Streaming has already been
+ * ended by process_syncing_tables_for_sync. We should move to the
+ * next table if needed, or exit.
+ */
+ if (MyLogicalRepWorker->is_sync_completed)
+ endofstream = true;

~

Instead of those code fragments above assigning 'endofstream' as a
side-effect, would it be the same (but tidier) to just modify the
other "breaking" condition below:

BEFORE:
/* Check if we need to exit the streaming loop. */
if (endofstream)
break;

AFTER:
/* Check if we need to exit the streaming loop. */
if (endofstream || MyLogicalRepWorker->is_sync_completed)
break;

~~~

7. LogicalRepApplyLoop

+ /*
+ * Tablesync workers should end streaming before exiting the main loop to
+ * drop replication slot. Only end streaming here for apply workers.
+ */
+ if (!am_tablesync_worker())
+ walrcv_endstreaming(LogRepWorkerWalRcvConn, );

This comment does not seem very clear. Maybe it can be reworded:

SUGGESTION
End streaming here only for apply workers. Ending streaming for
tablesync workers is deferred until ... because ...

~~~

8. TablesyncWorkerMain

+ StartTransactionCommand();
+ ereport(LOG,
+ (errmsg("%s for subscription \"%s\" has moved to sync table \"%s\"
with relid %u.",
+ get_worker_name(),
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid),
+ MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();

The "has moved to..." terminology is unusual. If you say something
"will be reused to..." then it matches better the commit message etc.

~~~

9.

+ if (!is_table_found)
+ break;

Instead of an infinite loop that is exited by this 'break' it might be
better to rearrange the logic slightly so the 'for' loop can exit
normally:

BEFORE:
for (;;)

AFTER
for (; !done;)

==
src/include/replication/worker_internal.h

10.
  XLogRecPtr relstate_lsn;
  slock_t relmutex;

+ /*
+ * 

RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-09 Thread Hayato Kuroda (Fujitsu)
Dear hackers,

Hi, I did a performance testing for v16 patch set.
Results show that patches significantly improves the performance in most cases.

# Method

Following tests were done 10 times per condition, and compared by median.
do_one_test.sh was used for the testing.

1.  Create tables on publisher
2.  Insert initial data on publisher
3.  Create tables on subscriber
4.  Create a replication slot (mysub_slot) on publisher
5.  Create a publication on publisher
6.  Create tables on subscriber
--- timer on ---
7.  Create subscription with pre-existing replication slot (mysub_slot)
8.  Wait until all srsubstate in pg_subscription_rel becomes 'r'
--- timer off ---

# Tested sources

I used three types of sources

* HEAD (f863d82)
* HEAD + 0001 + 0002
* HEAD + 0001 + 0002 + 0003

# Tested conditions

Following parameters were changed during the measurement.

### table size

* empty
* around 10kB

### number of tables

* 10
* 100
* 1000
* 2000

### max_sync_workers_per_subscription

* 2
* 4
* 8
* 16

## Results

Please see the attached image file. Each cell shows the improvement percentage 
of
measurement comapred with HEAD, HEAD + 0001 + 0002, and HEAD + 0001 + 0002 + 
0003.

According to the measurement, we can say following things:

* In any cases the performance was improved from the HEAD.
* The improvement became more significantly if number of synced tables were 
increased.
* 0003 basically improved performance from first two patches
* Increasing workers could sometimes lead to lesser performance due to 
contention.
  This was occurred when the number of tables were small. Moreover, this was 
not only happen by patchset - it happened even if we used HEAD.
  Detailed analysis will be done later.

Mored deital, please see the excel file. It contains all the results of 
measurement.

## Detailed configuration

* Powerful machine was used:
 - Number of CPU: 120
 - Memory: 755 GB

* Both publisher and subscriber were on the same machine.
* Following GUC settings were used for both pub/sub:

```
wal_level = logical
shared_buffers = 40GB
max_worker_processes = 32
max_parallel_maintenance_workers = 24
max_parallel_workers = 32
synchronous_commit = off
checkpoint_timeout = 1d
max_wal_size = 24GB
min_wal_size = 15GB
autovacuum = off
max_wal_senders = 200
max_replication_slots = 200
```

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



perftest_result.xlsx
Description: perftest_result.xlsx


do_one_test.sh
Description: do_one_test.sh


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-07 Thread Peter Smith
Hi. Here are some review comments for the patch v16-0001

==
Commit message.

1.
Also; most of the code shared by both worker types are already combined
in LogicalRepApplyLoop(). There is no need to combine the rest in
ApplyWorkerMain() anymore.

~

/are already/is already/

/Also;/Also,/

~~~

2.
This commit introduces TablesyncWorkerMain() as a new entry point for
tablesync workers and separates both type of workers from each other.
This aims to increase code readability and help to maintain logical
replication workers separately.

2a.
/This commit/This patch/

~

2b.
"and separates both type of workers from each other"

Maybe that part can all be removed. The following sentence says the
same again anyhow.

==
src/backend/replication/logical/worker.c

3.
 static void stream_write_change(char action, StringInfo s);
 static void stream_open_and_write_change(TransactionId xid, char
action, StringInfo s);
 static void stream_close_file(void);
+static void set_stream_options(WalRcvStreamOptions *options,
+char *slotname,
+XLogRecPtr *origin_startpos);

~

Maybe a blank line was needed here because this static should not be
grouped with the other functions that are grouped for "Serialize and
deserialize changes for a toplevel transaction." comment.

~~~

4. set_stream_options

+ /* set_stream_options
+  * Set logical replication streaming options.
+  *
+  * This function sets streaming options including replication slot name and
+  * origin start position. Workers need these options for logical replication.
+  */
+static void
+set_stream_options(WalRcvStreamOptions *options,

The indentation is not right for this function comment.

~~~

5. set_stream_options

+ /*
+ * Even when the two_phase mode is requested by the user, it remains as
+ * the tri-state PENDING until all tablesyncs have reached READY state.
+ * Only then, can it become ENABLED.
+ *
+ * Note: If the subscription has no tables then leave the state as
+ * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+ * work.
+ */
+ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+ AllTablesyncsReady())
+ options->proto.logical.twophase = true;
+}

This part of the refactoring seems questionable...

IIUC this new function was extracted from code in originally in
function ApplyWorkerMain()

But in that original code, this fragment above was guarded by the condition
if (!am_tablesync_worker())

But now where is that condition? e.g. What is stopping tablesync
working from getting into this code it previously would not have
executed?

~~~

6.
  AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ !am_tablesync_worker());

Does this change have anything to do with this patch? Is it a quirk of
running pg_indent?

~~~
7. run_tablesync_worker

Since the stated intent of the patch is the separation of apply and
tablesync workers then shouldn't this function belong in the
tablesync.c file?

~~~
8. run_tablesync_worker

+ * Runs the tablesync worker.
+ * It starts syncing tables. After a successful sync, sets streaming options
+ * and starts streaming to catchup.
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,

Nicer to have a blank line after the first sentence of that function comment?

~~~
9. run_apply_worker

+/*
+ * Runs the leader apply worker.
+ * It sets up replication origin, streaming options and then starts streaming.
+ */
+static void
+run_apply_worker(WalRcvStreamOptions *options,

Nicer to have a blank line after the first sentence of that function comment?

~~~
10. InitializeLogRepWorker

+/*
+ * Common initialization for logical replication workers; leader apply worker,
+ * parallel apply worker and tablesync worker.
  *
  * Initialize the database connection, in-memory subscription and necessary
  * config options.
  */
 void
-InitializeApplyWorker(void)
+InitializeLogRepWorker(void)

typo:

/workers;/workers:/

~~~
11. TablesyncWorkerMain

Since the stated intent of the patch is the separation of apply and
tablesync workers then shouldn't this function belong in the
tablesync.c file?

==
src/include/replication/worker_internal.h

12.
 #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)

+extern void finish_sync_worker(void);

~

I think the macro isParallelApplyWorker is associated with the am_XXX
inline functions that follow it, so it doesn’t seem the best place to
jam this extern in the middle of that.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-05 Thread Amit Kapila
On Wed, Jul 5, 2023 at 1:48 AM Melih Mutlu  wrote:
>
> Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal,
> 08:42 tarihinde şunu yazdı:
> > > > But in the later patch the tablesync worker tries to reuse the slot 
> > > > during the
> > > > synchronization, so in this case the application_name should be same as
> > > slotname.
> > > >
> > >
> > > Fair enough. I am slightly afraid that if we can't show the benefits
> > > with later patches then we may need to drop them but at this stage I
> > > feel we need to investigate why those are not helping?
> >
> > Agreed. Now I'm planning to do performance testing independently. We can 
> > discuss
> > based on that or Melih's one.
>
> Here I attached  what I use for performance testing of this patch.
>
> I only benchmarked the patch set with reusing connections very roughly
> so far. But seems like it improves quite significantly. For example,
> it took 611 ms to sync 100 empty tables, it was 1782 ms without
> reusing connections.
> First 3 patches from the set actually bring a good amount of
> improvement, but not sure about the later patches yet.
>

I suggest then we should focus first on those 3, get them committed
and then look at the remaining.

> Amit Kapila , 3 Tem 2023 Pzt, 08:59 tarihinde
> şunu yazdı:
> > On thinking about this, I think the primary benefit we were expecting
> > by saving network round trips for slot drop/create but now that we
> > anyway need an extra round trip to establish a snapshot, so such a
> > benefit was not visible. This is just a theory so we should validate
> > it. The another idea as discussed before [1] could be to try copying
> > multiple tables in a single transaction. Now, keeping a transaction
> > open for a longer time could have side-effects on the publisher node.
> > So, we probably need to ensure that we don't perform multiple large
> > syncs and even for smaller tables (and later sequences) perform it
> > only for some threshold number of tables which we can figure out by
> > some tests. Also, the other safety-check could be that anytime we need
> > to perform streaming (sync with apply worker), we won't copy more
> > tables in same transaction.
> >
> > Thoughts?
>
> Yeah, maybe going to the publisher for creating a slot or only a
> snapshot does not really make enough difference. I was hoping that
> creating only snapshot by an existing replication slot would help the
> performance. I guess I was either wrong or am missing something in the
> implementation.
>
> The tricky bit with keeping a long transaction to copy multiple tables
> is deciding how many tables one transaction can copy.
>

Yeah, I was thinking that we should not allow copying some threshold
data in one transaction. After every copy, we will check the size of
the table and add it to the previously copied table size in the same
transaction. Once the size crosses a certain threshold, we will end
the transaction. This may not be a very good scheme but I think it
this helps then it would be much simpler than creating-only-snapshot
approach.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-04 Thread Melih Mutlu
Hayato Kuroda (Fujitsu) , 4 Tem 2023 Sal,
08:42 tarihinde şunu yazdı:
> > > But in the later patch the tablesync worker tries to reuse the slot 
> > > during the
> > > synchronization, so in this case the application_name should be same as
> > slotname.
> > >
> >
> > Fair enough. I am slightly afraid that if we can't show the benefits
> > with later patches then we may need to drop them but at this stage I
> > feel we need to investigate why those are not helping?
>
> Agreed. Now I'm planning to do performance testing independently. We can 
> discuss
> based on that or Melih's one.

Here I attached  what I use for performance testing of this patch.

I only benchmarked the patch set with reusing connections very roughly
so far. But seems like it improves quite significantly. For example,
it took 611 ms to sync 100 empty tables, it was 1782 ms without
reusing connections.
First 3 patches from the set actually bring a good amount of
improvement, but not sure about the later patches yet.

Amit Kapila , 3 Tem 2023 Pzt, 08:59 tarihinde
şunu yazdı:
> On thinking about this, I think the primary benefit we were expecting
> by saving network round trips for slot drop/create but now that we
> anyway need an extra round trip to establish a snapshot, so such a
> benefit was not visible. This is just a theory so we should validate
> it. The another idea as discussed before [1] could be to try copying
> multiple tables in a single transaction. Now, keeping a transaction
> open for a longer time could have side-effects on the publisher node.
> So, we probably need to ensure that we don't perform multiple large
> syncs and even for smaller tables (and later sequences) perform it
> only for some threshold number of tables which we can figure out by
> some tests. Also, the other safety-check could be that anytime we need
> to perform streaming (sync with apply worker), we won't copy more
> tables in same transaction.
>
> Thoughts?

Yeah, maybe going to the publisher for creating a slot or only a
snapshot does not really make enough difference. I was hoping that
creating only snapshot by an existing replication slot would help the
performance. I guess I was either wrong or am missing something in the
implementation.

The tricky bit with keeping a long transaction to copy multiple tables
is deciding how many tables one transaction can copy.

Thanks,
-- 
Melih Mutlu
Microsoft
--- on publisher
SELECT 'CREATE TABLE manytables_'||i||'(i int);' FROM generate_series(1, 100) 
g(i) \gexec
SELECT pg_create_logical_replication_slot('mysub_slot', 'pgoutput');

--- on subscriber
SELECT 'CREATE TABLE manytables_'||i||'(i int);' FROM generate_series(1, 100) 
g(i) \gexec

CREATE OR REPLACE PROCEDURE log_rep_test(max INTEGER) AS $$
DECLARE
counter INTEGER := 1;
total_duration INTERVAL := '0';
avg_duration FLOAT := 0.0;
start_time TIMESTAMP;
end_time TIMESTAMP;
BEGIN
WHILE counter <= max LOOP

EXECUTE 'DROP SUBSCRIPTION IF EXISTS mysub;';

start_time := clock_timestamp();
EXECUTE 'CREATE SUBSCRIPTION mysub CONNECTION ''dbname=postgres 
port=5432'' PUBLICATION mypub WITH (create_slot=false, 
slot_name=''mysub_slot'');';
COMMIT;

WHILE EXISTS (SELECT 1 FROM pg_subscription_rel WHERE srsubstate != 
'r') LOOP
COMMIT;
END LOOP;

end_time := clock_timestamp();


EXECUTE 'ALTER SUBSCRIPTION mysub DISABLE;';
EXECUTE 'ALTER SUBSCRIPTION mysub SET (slot_name = none);';


total_duration := total_duration + (end_time - start_time);

counter := counter + 1;
END LOOP;

IF max > 0 THEN
avg_duration := EXTRACT(EPOCH FROM total_duration) / max * 1000;
END IF;

RAISE NOTICE '%', avg_duration;
END;
$$ LANGUAGE plpgsql;


call log_rep_test(5);

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-03 Thread vignesh C
On Wed, 28 Jun 2023 at 12:02, Hayato Kuroda (Fujitsu)
 wrote:
>
> Dear Amit,
>
> > > > This actually makes sense. I quickly try to do that without adding any
> > > > new replication message. As you would expect, it did not work.
> > > > I don't really know what's needed to make a connection to last for
> > > > more than one iteration. Need to look into this. Happy to hear any
> > > > suggestions and thoughts.
> > >
> >
> > It is not clear to me what exactly you tried here which didn't work.
> > Can you please explain a bit more?
>
> Just to confirm, this is not my part. Melih can answer this...
>
> > > I have analyzed how we handle this. Please see attached the patch (0003) 
> > > which
> > > allows reusing connection.
> > >
> >
> > Why did you change the application name during the connection?
>
> It was because the lifetime of tablesync worker is longer than slots's one and
> tablesync worker creates temporary replication slots many times, per the 
> target
> relation. The name of each slots has relid, so I thought that it was not 
> suitable.
> But in the later patch the tablesync worker tries to reuse the slot during the
> synchronization, so in this case the application_name should be same as 
> slotname.
>
> I added comment in 0003, and new file 0006 file to use slot name as 
> application_name
> again. Note again that the separation was just for specifying changes, Melih 
> can
> include them to one part of files if needed.

Few comments:
1) Should these error messages say "Could not create a snapshot by
replication slot":
+   if (!pubnames_str)
+   ereport(ERROR,
+   (errcode(ERRCODE_OUT_OF_MEMORY),
 /* likely guess */
+errmsg("could not start WAL streaming: %s",
+
pchomp(PQerrorMessage(conn->streamConn);
+   pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
+
strlen(pubnames_str));
+   if (!pubnames_literal)
+   ereport(ERROR,
+   (errcode(ERRCODE_OUT_OF_MEMORY),
 /* likely guess */
+errmsg("could not start WAL streaming: %s",
+
pchomp(PQerrorMessage(conn->streamConn);
+   appendStringInfo(, ", publication_names %s", pubnames_literal);
+   PQfreemem(pubnames_literal);
+   pfree(pubnames_str);

2) These checks are present in CreateReplicationSlot too, can we have
a common function to check these for both CreateReplicationSlot and
CreateReplicationSnapshot:
+   if (!IsTransactionBlock())
+   ereport(ERROR,
+   (errmsg("%s must be called inside a
transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+   if (XactIsoLevel != XACT_REPEATABLE_READ)
+   ereport(ERROR,
+   (errmsg("%s must be called in
REPEATABLE READ isolation mode transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+   if (!XactReadOnly)
+   ereport(ERROR,
+   (errmsg("%s must be called in a read
only transaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+   if (FirstSnapshotSet)
+   ereport(ERROR,
+   (errmsg("%s must be called before any query",
+
"CREATE_REPLICATION_SNAPSHOT ...")));
+
+   if (IsSubTransaction())
+   ereport(ERROR,
+   (errmsg("%s must not be called in a
subtransaction",
+
"CREATE_REPLICATION_SNAPSHOT ...")));

3) Probably we can add the function header at this point of time:
+/*
+ * TODO
+ */
+static void
+libpqrcv_slot_snapshot(WalReceiverConn *conn,
+  char *slotname,
+  const WalRcvStreamOptions *options,
+  XLogRecPtr *lsn)

4) Either or relation name or relid should be sufficient here, no need
to print both:
   StartTransactionCommand();
+   ereport(LOG,
+   (errmsg("%s
for subscription \"%s\" has moved to sync table \"%s\" with relid
%u.",
+
 get_worker_name(),
+
 MySubscription->name,
+
 get_rel_name(MyLogicalRepWorker->relid),
+
 MyLogicalRepWorker->relid)));
+   CommitTransactionCommand();

5) Why is this check of logicalrep_worker_find is required required,
will it not be sufficient to pick the relations that are in
SUBREL_STATE_INIT state?
+   /*
+   * Pick the table for the next run if
it is not already picked up
+   * by another worker.
+   *
+   * Take exclusive lock to prevent any
other sync worker from picking
+   * the same table.
+   */
+   LWLockAcquire(LogicalRepWorkerLock,

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-02 Thread Amit Kapila
On Mon, Jul 3, 2023 at 9:42 AM Amit Kapila  wrote:
>
> On Wed, Jun 28, 2023 at 12:02 PM Hayato Kuroda (Fujitsu)
>  wrote:
>
> > But in the later patch the tablesync worker tries to reuse the slot during 
> > the
> > synchronization, so in this case the application_name should be same as 
> > slotname.
> >
>
> Fair enough. I am slightly afraid that if we can't show the benefits
> with later patches then we may need to drop them but at this stage I
> feel we need to investigate why those are not helping?
>

On thinking about this, I think the primary benefit we were expecting
by saving network round trips for slot drop/create but now that we
anyway need an extra round trip to establish a snapshot, so such a
benefit was not visible. This is just a theory so we should validate
it. The another idea as discussed before [1] could be to try copying
multiple tables in a single transaction. Now, keeping a transaction
open for a longer time could have side-effects on the publisher node.
So, we probably need to ensure that we don't perform multiple large
syncs and even for smaller tables (and later sequences) perform it
only for some threshold number of tables which we can figure out by
some tests. Also, the other safety-check could be that anytime we need
to perform streaming (sync with apply worker), we won't copy more
tables in same transaction.

Thoughts?

[1] - 
https://www.postgresql.org/message-id/CAGPVpCRWEVhXa7ovrhuSQofx4to7o22oU9iKtrOgAOtz_%3DY6vg%40mail.gmail.com

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-07-02 Thread Amit Kapila
On Wed, Jun 28, 2023 at 12:02 PM Hayato Kuroda (Fujitsu)
 wrote:
>
> > > I have analyzed how we handle this. Please see attached the patch (0003) 
> > > which
> > > allows reusing connection.
> > >
> >
> > Why did you change the application name during the connection?
>
> It was because the lifetime of tablesync worker is longer than slots's one and
> tablesync worker creates temporary replication slots many times, per the 
> target
> relation. The name of each slots has relid, so I thought that it was not 
> suitable.
>

Okay, but let's try to give a unique application name to each
tablesync worker for the purpose of pg_stat_activity and synchronous
replication (as mentioned in existing comments as well). One idea is
to generate a name like pg__sync_ but feel free
to suggest if you have any better ideas.

> But in the later patch the tablesync worker tries to reuse the slot during the
> synchronization, so in this case the application_name should be same as 
> slotname.
>

Fair enough. I am slightly afraid that if we can't show the benefits
with later patches then we may need to drop them but at this stage I
feel we need to investigate why those are not helping?

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-27 Thread Amit Kapila
On Tue, Jun 27, 2023 at 1:12 PM Hayato Kuroda (Fujitsu)
 wrote:
>
> > This actually makes sense. I quickly try to do that without adding any
> > new replication message. As you would expect, it did not work.
> > I don't really know what's needed to make a connection to last for
> > more than one iteration. Need to look into this. Happy to hear any
> > suggestions and thoughts.
>

It is not clear to me what exactly you tried here which didn't work.
Can you please explain a bit more?

> I have analyzed how we handle this. Please see attached the patch (0003) which
> allows reusing connection.
>

Why did you change the application name during the connection?

-- 
With Regards,
Amit Kapila.




RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-27 Thread Hayato Kuroda (Fujitsu)
Dear Melih,

Thanks for updating the patch. Followings are my comments.
Note that some lines exceeds 80 characters and some other lines seem too short.
And comments about coding conventions were skipped.

0001

01. logicalrep_worker_launch()

```
if (is_parallel_apply_worker)
+   {
snprintf(bgw.bgw_function_name, BGW_MAXLEN, 
"ParallelApplyWorkerMain");
-   else
-   snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
-
-   if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
-"logical replication worker for subscription 
%u sync %u", subid, relid);
-   else if (is_parallel_apply_worker)
+"logical replication parallel apply worker for 
subscription %u", subid);
snprintf(bgw.bgw_name, BGW_MAXLEN,
 "logical replication parallel apply worker for 
subscription %u", subid);
```

Latter snprintf(bgw.bgw_name...) should be snprintf(bgw.bgw_type, BGW_MAXLEN, 
"logical replication worker").

02. ApplyWorkerMain

```
/*
 * Setup callback for syscache so that we know when something changes in
-* the subscription relation state.
+* the subscription relation state. Do this outside the loop to avoid
+* exceeding MAX_SYSCACHE_CALLBACKS
 */
```

I'm not sure this change is really needed. CacheRegisterSyscacheCallback() must
be outside the loop to avoid duplicated register, and it seems trivial.

0002

03. TablesyncWorkerMain()

Regarding the inner loop, the exclusive lock is acquired even if the rstate is
SUBREL_STATE_SYNCDONE. Moreover, palloc() and memcpy() for rstate seemsed not
needed. How about following?

```
for (;;)
{
List   *rstates;
-   SubscriptionRelState *rstate;
ListCell   *lc;
...
-   rstate = (SubscriptionRelState *) 
palloc(sizeof(SubscriptionRelState));
 
foreach(lc, rstates)
{
-   memcpy(rstate, lfirst(lc), 
sizeof(SubscriptionRelState));
+   SubscriptionRelState *rstate =
+   
(SubscriptionRelState *) lfirst(lc);
+
+   if (rstate->state == SUBREL_STATE_SYNCDONE)
+   continue;

/*
-   * Pick the table for the next run if it is not 
already picked up
-   * by another worker.
-   *
-   * Take exclusive lock to prevent any other sync 
worker from picking
-   * the same table.
-   */
+* Take exclusive lock to prevent any other 
sync worker from
+* picking the same table.
+*/
LWLockAcquire(LogicalRepWorkerLock, 
LW_EXCLUSIVE);
-   if (rstate->state != SUBREL_STATE_SYNCDONE &&
-   
!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+
+   /*
+* Pick the table for the next run if it is not 
already picked up
+* by another worker.
+*/
+   if (!logicalrep_worker_find(MySubscription->oid,
+   
rstate->relid, false))
```

04. TablesyncWorkerMain

I think rstates should be pfree'd at the end of the outer loop, but it's OK
if other parts do not.

05. repsponse for for post

>
I tried to move the logicalrep_worker_wakeup call from
clean_sync_worker (end of an iteration) to finish_sync_worker (end of
sync worker). I made table sync much slower for some reason, then I
reverted that change. Maybe I should look a bit more into the reason
why that happened some time.
>

I want to see the testing method to reproduce the same issue, could you please
share it to -hackers?

0003, 0004

I did not checked yet but I could say same as above:
I want to see the testing method to reproduce the same issue.
Could you please share it to -hackers?
My previous post (an approach for reuse connection) may help the performance.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-27 Thread Amit Kapila
On Fri, Jun 23, 2023 at 7:03 PM Melih Mutlu  wrote:
>
> You can find the updated patchset attached.
> I worked to address the reviews and made some additional changes.
>
> Let me first explain the new patchset.
> 0001: Refactors the logical replication code, mostly worker.c and
> tablesync.c. Although this patch makes it easier to reuse workers, I
> believe that it's useful even by itself without other patches. It does
> not improve performance or anything but aims to increase readability
> and such.
> 0002: This is only to reuse worker processes, everything else stays
> the same (replication slots/origins etc.).
> 0003: Adds a new command for streaming replication protocol to create
> a snapshot by an existing replication slot.
> 0004: Reuses replication slots/origins together with workers.
>
> Even only 0001 and 0002 are enough to improve table sync performance
> at the rates previously shared on this thread. This also means that
> currently 0004 (reusing replication slots/origins) does not improve as
> much as I would expect, even though it does not harm either.
> I just wanted to share what I did so far, while I'm continuing to
> investigate it more to see what I'm missing in patch 0004.
>

I think the reason why you don't see the benefit of the 0004 patches
is that it still pays the cost of disconnect/connect and we haven't
saved much on network transfer costs because of the new snapshot you
are creating in patch 0003. Is it possible to avoid disconnect/connect
each time the patch needs to reuse the same tablesync worker? Once, we
do that and save the cost of drop_slot and associated network round
trip, you may see the benefit of 0003 and 0004 patches.

-- 
With Regards,
Amit Kapila.




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-25 Thread Peter Smith
On Fri, Jun 23, 2023 at 11:50 PM Melih Mutlu  wrote:
>
> > src/backend/replication/logical/worker.c
> >
> > 10. General -- run_tablesync_worker, TablesyncWorkerMain
> >
> > IMO these functions would be more appropriately reside in the
> > tablesync.c instead of the (common) worker.c. Was there some reason
> > why they cannot be put there?
>
> I'm not really against moving those functions to tablesync.c. But
> what's not clear to me is worker.c. Is it the places to put common
> functions for all log. rep. workers? Then, what about apply worker?
> Should we consider a separate file for apply worker too?

IIUC
- tablesync.c = for tablesync only
- applyparallelworker = for parallel apply worker only
- worker.c = for both normal apply worker, plus "common" worker code

Regarding making another file (e.g. applyworker.c). It sounds
sensible, but I guess you would need to first demonstrate the end
result will be much cleaner to get support for such a big refactor.

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-23 Thread Melih Mutlu
Hi Peter,

Thanks for your reviews. I tried to apply most of them. I just have
some comments below for some of them.

Peter Smith , 14 Haz 2023 Çar, 08:45 tarihinde
şunu yazdı:
>
> 9. process_syncing_tables_for_sync
>
> @@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
>   */
>   replorigin_drop_by_name(originname, true, false);
>
> - finish_sync_worker();
> + /*
> + * Sync worker is cleaned at this point. It's ready to sync next table,
> + * if needed.
> + */
> + SpinLockAcquire(>relmutex);
> + MyLogicalRepWorker->ready_to_reuse = true;
> + SpinLockRelease(>relmutex);
>
> 9a.
> I did not quite follow the logic. It says "Sync worker is cleaned at
> this point", but who is doing that? -- more details are needed. But,
> why not just call clean_sync_worker() right here like it use to call
> finish_sync_worker?

I agree that these explanations at places where the worker decides to
not continue with the current table were confusing. Even the name of
ready_to_reuse was misleading. I renamed it and tried to improve
comments in such places.
Can you please check if those make more sense now?


> ==
> src/backend/replication/logical/worker.c
>
> 10. General -- run_tablesync_worker, TablesyncWorkerMain
>
> IMO these functions would be more appropriately reside in the
> tablesync.c instead of the (common) worker.c. Was there some reason
> why they cannot be put there?

I'm not really against moving those functions to tablesync.c. But
what's not clear to me is worker.c. Is it the places to put common
functions for all log. rep. workers? Then, what about apply worker?
Should we consider a separate file for apply worker too?

Thanks,
-- 
Melih Mutlu
Microsoft




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-23 Thread Melih Mutlu
Hi,

Thanks for your reviews.

Hayato Kuroda (Fujitsu) , 13 Haz 2023 Sal,
13:06 tarihinde şunu yazdı:
> 01. general
>
> Why do tablesync workers have to disconnect from publisher for every 
> iterations?
> I think connection initiation overhead cannot be negligible in the postgres's 
> basic
> architecture. I have not checked yet, but could we add a new replication 
> message
> like STOP_STREAMING or CLEANUP? Or, engineerings for it is quite larger than 
> the benefit?

This actually makes sense. I quickly try to do that without adding any
new replication message. As you would expect, it did not work.
I don't really know what's needed to make a connection to last for
more than one iteration. Need to look into this. Happy to hear any
suggestions and thoughts.

> The sync worker sends a signal to its leader per the iteration, but it may be 
> too
> often. Maybe it is added for changing the rstate to READY, however, it is OK 
> to
> change it when the next change have come because 
> should_apply_changes_for_rel()
> returns true even if rel->state == SUBREL_STATE_SYNCDONE. I think the 
> notification
> should be done only at the end of sync workers. How do you think?

I tried to move the logicalrep_worker_wakeup call from
clean_sync_worker (end of an iteration) to finish_sync_worker (end of
sync worker). I made table sync much slower for some reason, then I
reverted that change. Maybe I should look a bit more into the reason
why that happened some time.

Thanks,
-- 
Melih Mutlu
Microsoft




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-13 Thread Peter Smith
Here are some review comments for the patch v2-0001.

==
Commit message

1. General
Better to use consistent terms in this message. Either "relations" or
"tables" -- not a mixture of both.

~~~

2.
Before this commit, tablesync workers were capable of syncing only one
relation. For each table, a new sync worker was launched and the worker
would exit when the worker is done with the current table.

~

SUGGESTION (2nd sentence)
For each table, a new sync worker was launched and that worker would
exit when done processing the table.

~~~

3.
Now, tablesync workers are not only limited with one relation and can
move to another relation in the same subscription. This reduces the
overhead of launching a new background worker and exiting from that
worker for each relation.

~

SUGGESTION (1st sentence)
Now, tablesync workers are not limited to processing only one
relation. When done, they can move to processing another relation in
the same subscription.

~~~

4.
A new tablesync worker gets launched only if the number of tablesync
workers for the subscription does not exceed
max_sync_workers_per_subscription. If there is a table needs to be synced,
a tablesync worker picks that up and syncs it.The worker continues to
picking new tables to sync until there is no table left for synchronization
in the subscription.

~

This seems to be missing the point that only "available" workers go
looking for more tables to process. Maybe reword something like below:

SUGGESTION
If there is a table that needs to be synced, an "available" tablesync
worker picks up that table and syncs it. Each tablesync worker
continues to pick new tables to sync until there are no tables left
requiring synchronization. If there was no "available" worker to
process the table, then a new tablesync worker will be launched,
provided the number of tablesync workers for the subscription does not
exceed max_sync_workers_per_subscription.

==
src/backend/replication/logical/launcher.c

5. logicalrep_worker_launch

@@ -460,8 +461,10 @@ retry:

  if (is_parallel_apply_worker)
  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
- else
+ else if (!OidIsValid(relid))
  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");

  if (OidIsValid(relid))
  snprintf(bgw.bgw_name, BGW_MAXLEN,

~

5a.
I felt at least these conditions can be rearranged, so you can use
OidIsValid(relid) instead of !OidIsValid(relid).

~

5b.
Probably it can all be simplified, if you are happy to do it in one line:

snprintf(bgw.bgw_function_name, BGW_MAXLEN,
OidIsValid(relid) ? "TablesyncWorkerMain" :
is_parallel_apply_worker ? "ParallelApplyWorkerMain" :
"ApplyWorkerMain");

==
src/backend/replication/logical/tablesync.c

6. finish_sync_worker

This function is removed/renamed but there are still commenting in
this file referring to 'finish-sync_worker'

~~~

7. clean_sync_worker

I agree with comment from Shi-san. There should still be logging
somewhere that say this tablesync worker has completed the processing
of the current table.

~~~

8. sync_worker_exit

There is inconsistent function naming for clean_sync_worker versus
sync_worker_exit.

How about: clean_sync_worker/exit_sync_worker?
Or: sync_worker_clean/sync_worker_exit?

~~~

9. process_syncing_tables_for_sync

@@ -378,7 +387,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  */
  replorigin_drop_by_name(originname, true, false);

- finish_sync_worker();
+ /*
+ * Sync worker is cleaned at this point. It's ready to sync next table,
+ * if needed.
+ */
+ SpinLockAcquire(>relmutex);
+ MyLogicalRepWorker->ready_to_reuse = true;
+ SpinLockRelease(>relmutex);

9a.
I did not quite follow the logic. It says "Sync worker is cleaned at
this point", but who is doing that? -- more details are needed. But,
why not just call clean_sync_worker() right here like it use to call
finish_sync_worker?

~

9b.
Shouldn't this "ready_to_use" flag be assigned within the
clean_sync_worker() function, since that is the function making is
clean for next re-use. The function comment even says so: "Prepares
the synchronization worker for reuse or exit."

==
src/backend/replication/logical/worker.c

10. General -- run_tablesync_worker, TablesyncWorkerMain

IMO these functions would be more appropriately reside in the
tablesync.c instead of the (common) worker.c. Was there some reason
why they cannot be put there?

~~~

11. LogicalRepApplyLoop

+ /*
+ * apply_dispatch() may have gone into apply_handle_commit()
+ * which can go into process_syncing_tables_for_sync early.
+ * Before we were able to reuse tablesync workers, that
+ * process_syncing_tables_for_sync call would exit the worker
+ * instead of preparing for reuse. Now that tablesync workers
+ * can be reused and process_syncing_tables_for_sync is not
+ * responsible for exiting. We need to take care of memory
+ * contexts here before moving to sync the 

RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-13 Thread Hayato Kuroda (Fujitsu)
Dear Melih,

Thank you for making the patch!
I'm also interested in the patchset. Here are the comments for 0001.

Some codes are not suit for our coding conventions, but followings do not 
contain them
because patches seems in the early statge.
Moreover, 0003 needs rebase.

01. general

Why do tablesync workers have to disconnect from publisher for every iterations?
I think connection initiation overhead cannot be negligible in the postgres's 
basic
architecture. I have not checked yet, but could we add a new replication message
like STOP_STREAMING or CLEANUP? Or, engineerings for it is quite larger than 
the benefit?

02. logicalrep_worker_launch()

```
-   else
+   else if (!OidIsValid(relid))
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+   else
+   snprintf(bgw.bgw_function_name, BGW_MAXLEN, 
"TablesyncWorkerMain");
```

You changed the entry point of tablesync workers, but bgw_type is still the 
same.
Do you have any decisions about it? 

03. process_syncing_tables_for_sync()

```
+   /*
+* Sync worker is cleaned at this point. It's ready to sync 
next table,
+* if needed.
+*/
+   SpinLockAcquire(>relmutex);
+   MyLogicalRepWorker->ready_to_reuse = true;
+   SpinLockRelease(>relmutex);
```

Maybe acquiring the lock for modifying ready_to_reuse is not needed because all
the sync workers check only the own attribute. Moreover, other processes do not 
read.

04. sync_worker_exit()

```
+/*
+ * Exit routine for synchronization worker.
+ */
+void
+pg_attribute_noreturn()
+sync_worker_exit(void)
```

I think we do not have to rename the function from finish_sync_worker().

05. LogicalRepApplyLoop()

```
+   if (MyLogicalRepWorker->ready_to_reuse)
+   {
+   endofstream = true;
+   }
```

We should add comments here to clarify the reason.

06. stream_build_options()

I think we can set twophase attribute here.

07. TablesyncWorkerMain()

```
+   ListCell   *lc;
```

This variable should be declared inside the loop.

08. TablesyncWorkerMain()

```
+   /*
+* If a relation with INIT state is assigned, clean up the 
worker for
+* the next iteration.
+*
+* If there is no more work left for this worker, break the 
loop to
+* exit.
+*/
+   if ( MyLogicalRepWorker->relstate == SUBREL_STATE_INIT)
+   clean_sync_worker();
```

The sync worker sends a signal to its leader per the iteration, but it may be 
too
often. Maybe it is added for changing the rstate to READY, however, it is OK to
change it when the next change have come because should_apply_changes_for_rel()
returns true even if rel->state == SUBREL_STATE_SYNCDONE. I think the 
notification
should be done only at the end of sync workers. How do you think? 

Best Regards,
Hayato Kuroda
FUJITSU LIMITED



RE: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-05 Thread Yu Shi (Fujitsu)
On Thu, Jun 1, 2023 6:54 PM Melih Mutlu  wrote:
> 
> Hi,
> 
> I rebased the patch and addressed the following reviews.
> 

Thanks for updating the patch. Here are some comments on 0001 patch.

1.
-   ereport(LOG,
-   (errmsg("logical replication table synchronization 
worker for subscription \"%s\", table \"%s\" has finished",
-   MySubscription->name,
-   
get_rel_name(MyLogicalRepWorker->relid;

Could we move this to somewhere else instead of removing it?

2.
+   if (!OidIsValid(originid))
+   originid = replorigin_create(originname);
+   replorigin_session_setup(originid, 0);
+   replorigin_session_origin = originid;
+   *origin_startpos = replorigin_session_get_progress(false);
+   CommitTransactionCommand();
+
+   /* Is the use of a password mandatory? */
+   must_use_password = MySubscription->passwordrequired &&
+   !superuser_arg(MySubscription->owner);
+   LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+   
must_use_password,
+   
MySubscription->name, );

It seems that there is a problem when refactoring.
See commit e7e7da2f8d5.

3.
+   /* Set this to false for safety, in case we're already reusing the 
worker */
+   MyLogicalRepWorker->ready_to_reuse = false;
+

I am not sure do we need to lock when setting it.

4.
+   /*
+* Allocate the origin name in long-lived context for error context
+* message.
+*/
+   StartTransactionCommand();
+   ReplicationOriginNameForLogicalRep(MySubscription->oid,
+  
MyLogicalRepWorker->relid,
+  
originname,
+  
originname_size);
+   CommitTransactionCommand();

Do we need the call to StartTransactionCommand() and CommitTransactionCommand()
here? Besides, the comment here is the same as the comment atop
set_apply_error_context_origin(), do we need it?

5.
I saw a segmentation fault when debugging.

It happened when calling sync_worker_exit() called (see the code below in
LogicalRepSyncTableStart()). In the case that this is not the first table the
worker synchronizes, clean_sync_worker() has been called before (in
TablesyncWorkerMain()), and LogRepWorkerWalRcvConn has been set to NULL. Then, a
segmentation fault happened because LogRepWorkerWalRcvConn is a null pointer.

switch (relstate)
{
case SUBREL_STATE_SYNCDONE:
case SUBREL_STATE_READY:
case SUBREL_STATE_UNKNOWN:
sync_worker_exit(); /* doesn't return */
}

Here is the backtrace.

#0  0x7fc8a8ce4c95 in libpqrcv_disconnect (conn=0x0) at 
libpqwalreceiver.c:757
#1  0x0092b8c0 in clean_sync_worker () at tablesync.c:150
#2  0x0092b8ed in sync_worker_exit () at tablesync.c:164
#3  0x0092d8f6 in LogicalRepSyncTableStart 
(origin_startpos=0x7ffd50f30f08) at tablesync.c:1293
#4  0x00934f76 in start_table_sync (origin_startpos=0x7ffd50f30f08, 
myslotname=0x7ffd50f30e80) at worker.c:4457
#5  0x0093513b in run_tablesync_worker (options=0x7ffd50f30ec0, 
slotname=0x0, originname=0x7ffd50f30f10 "pg_16394_16395",
originname_size=64, origin_startpos=0x7ffd50f30f08) at worker.c:4532
#6  0x00935a3a in TablesyncWorkerMain (main_arg=1) at worker.c:4853
#7  0x008e97f6 in StartBackgroundWorker () at bgworker.c:864
#8  0x008f350b in do_start_bgworker (rw=0x12fc1a0) at postmaster.c:5762
#9  0x008f38b7 in maybe_start_bgworkers () at postmaster.c:5986
#10 0x008f2975 in process_pm_pmsignal () at postmaster.c:5149
#11 0x008ee98a in ServerLoop () at postmaster.c:1770
#12 0x008ee3bb in PostmasterMain (argc=3, argv=0x12c4af0) at 
postmaster.c:1463
#13 0x007b6d3a in main (argc=3, argv=0x12c4af0) at main.c:198


The steps to reproduce: 
Worker1, in TablesyncWorkerMain(), the relstate of new table to sync (obtained
by GetSubscriptionRelations) is SUBREL_STATE_INIT, and in the foreach loop,
before the following Check (it needs a breakpoint before locking),

LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
if (rstate->state != SUBREL_STATE_SYNCDONE &&
!logicalrep_worker_find(MySubscription->oid, 
rstate->relid, false))
{
/* Update worker state for the next table */
MyLogicalRepWorker->relid = rstate->relid;
MyLogicalRepWorker->relstate 

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-01 Thread Peter Smith
On Thu, Jun 1, 2023 at 7:22 AM Melih Mutlu  wrote:
>
> Hi Peter,
>
> Peter Smith , 26 May 2023 Cum, 10:30 tarihinde
> şunu yazdı:
> >
> > On Thu, May 25, 2023 at 6:59 PM Melih Mutlu  wrote:
> > Yes, I was mostly referring to the same as point 1 below about patch
> > 0001. I guess I just found the concept of mixing A) launching TSW (via
> > apply worker) with B) reassigning TSW to another relation (by the TSW
> > battling with its peers) to be a bit difficult to understand. I
> > thought most of the refactoring seemed to arise from choosing to do it
> > that way.
>
> No, the refactoring is not related to the way of assigning a new
> table. In fact, the patch did not include such refactoring a couple
> versions earlier [1] and was still assigning tables the same way. It
> was suggested here [2]. Then, I made the patch 0001 which includes
> some refactoring and only reuses the worker and nothing else. Also I
> find it more understandable this way, maybe it's a bit subjective.
>
> I feel that logical replication related files are getting more and
> more complex and hard to understand with each change. IMHO, even
> without reusing anything, those need some refactoring anyway. But for
> this patch, refactoring some places made it simpler to reuse workers
> and/or replication slots, regardless of how tables are assigned to
> TSW's.

If refactoring is wanted anyway (regardless of the chosen "reuse"
logic), then will it be better to split off a separate 0001 patch just
to get that part out of the way first?

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-06-01 Thread Melih Mutlu
Hi Peter,

Peter Smith , 26 May 2023 Cum, 10:30 tarihinde
şunu yazdı:
>
> On Thu, May 25, 2023 at 6:59 PM Melih Mutlu  wrote:
> Yes, I was mostly referring to the same as point 1 below about patch
> 0001. I guess I just found the concept of mixing A) launching TSW (via
> apply worker) with B) reassigning TSW to another relation (by the TSW
> battling with its peers) to be a bit difficult to understand. I
> thought most of the refactoring seemed to arise from choosing to do it
> that way.

No, the refactoring is not related to the way of assigning a new
table. In fact, the patch did not include such refactoring a couple
versions earlier [1] and was still assigning tables the same way. It
was suggested here [2]. Then, I made the patch 0001 which includes
some refactoring and only reuses the worker and nothing else. Also I
find it more understandable this way, maybe it's a bit subjective.

I feel that logical replication related files are getting more and
more complex and hard to understand with each change. IMHO, even
without reusing anything, those need some refactoring anyway. But for
this patch, refactoring some places made it simpler to reuse workers
and/or replication slots, regardless of how tables are assigned to
TSW's.

> +1. I think it would be nice to see POC of both ways for benchmark
> comparison because IMO performance is not the only consideration --
> unless there is an obvious winner, then they need to be judged also by
> the complexity of the logic, the amount of code that needed to be
> refactored, etc.

Will try to do that. But, like I mentioned above, I don't think that
such a change would reduce the complexity or number of lines changed.

> But it is difficult to get an overall picture of the behaviour. Mostly
> when benchmarks were posted you hold one variable fixed and show only
> one other varying. It always leaves me wondering -- what about not
> empty tables, or what about different numbers of tables etc. Is it
> possible to make some script to gather a bigger set of results so we
> can see everything at once? Perhaps then it will become clear there is
> some "sweet spot" where the patch is really good but beyond that it
> degrades (actually, who knows what it might show).

I actually shared the benchmarks with different numbers of tables and
sizes. But those were all with 2 workers. I guess you want a similar
benchmark with different numbers of workers.
Will work on this and share soon.



[1] 
https://www.postgresql.org/message-id/CAGPVpCQmEE8BygXr%3DHi2N2t2kOE%3DPJwofn9TX0J9J4crjoXarQ%40mail.gmail.com
[2] 
https://www.postgresql.org/message-id/CAAKRu_YKGyF%2BsvRQqe1th-mG9xLdzneWgh9H1z1DtypBkawkkw%40mail.gmail.com

Thanks,
-- 
Melih Mutlu
Microsoft




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-05-26 Thread Peter Smith
On Thu, May 25, 2023 at 6:59 PM Melih Mutlu  wrote:
>
> Hi,
>
>
> Peter Smith , 24 May 2023 Çar, 05:59 tarihinde şunu 
> yazdı:
>>
>> Hi, and thanks for the patch! It is an interesting idea.
>>
>> I have not yet fully read this thread, so below are only my first
>> impressions after looking at patch 0001. Sorry if some of these were
>> already discussed earlier.
>>
>> TBH the patch "reuse-workers" logic seemed more complicated than I had
>> imagined it might be.
>
>
> If you mean patch 0001 by the patch "reuse-workers", most of the complexity 
> comes with some refactoring to split apply worker and tablesync worker paths. 
> [1]
> If you mean the whole patch set, then I believe it's because reusing 
> replication slots also requires having a proper snapshot each time the worker 
> moves to a new table. [2]
>

Yes, I was mostly referring to the same as point 1 below about patch
0001. I guess I just found the concept of mixing A) launching TSW (via
apply worker) with B) reassigning TSW to another relation (by the TSW
battling with its peers) to be a bit difficult to understand. I
thought most of the refactoring seemed to arise from choosing to do it
that way.

>>
>>
>> 1.
>> IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it
>> finishes dealing with one table then goes looking to find if there is
>> some relation that it can process next. So now every TSW has a loop
>> where it will fight with every other available TSW over who will get
>> to process the next relation.
>>
>> Somehow this seems all backwards to me. Isn't it strange for the TSW
>> to be the one deciding what relation it would deal with next?
>>
>> IMO it seems more natural to simply return the finished TSW to some
>> kind of "pool" of available workers and the main Apply process can
>> just grab a TSW from that pool instead of launching a brand new one in
>> the existing function process_syncing_tables_for_apply(). Or, maybe
>> those "available" workers can be returned to a pool maintained within
>> the launcher.c code, which logicalrep_worker_launch() can draw from
>> instead of launching a whole new process?
>>
>> (I need to read the earlier posts to see if these options were already
>> discussed and rejected)
>
>
> I think ([3]) relying on a single apply worker for the assignment of several 
> tablesync workers might bring some overhead, it's possible that some 
> tablesync workers wait in idle until the apply worker assigns them something. 
> OTOH yes, the current approach makes tablesync workers race for a new table 
> to sync.

Yes, it might be slower than the 'patched' code because "available"
workers might be momentarily idle while they wait to be re-assigned to
the next relation. We would need to try it to find out.

> TBF both ways might be worth discussing/investigating more, before deciding 
> which way to go.

+1. I think it would be nice to see POC of both ways for benchmark
comparison because IMO performance is not the only consideration --
unless there is an obvious winner, then they need to be judged also by
the complexity of the logic, the amount of code that needed to be
refactored, etc.

>
>>
>> 2.
>> AFAIK the thing that identifies a  tablesync worker is the fact that
>> only TSW will have a 'relid'.
>>
>> But it feels very awkward to me to have a TSW marked as "available"
>> and yet that LogicalRepWorker must still have some OLD relid field
>> value lurking (otherwise it will forget that it is a "tablesync"
>> worker!).
>>
>> IMO perhaps it is time now to introduce some enum 'type' to the
>> LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid'
>> whereas an "available" type=TSW would have relid == InvalidOid.
>
>
> Hmm, relid will be immediately updated when the worker moves to a new table. 
> And the time between finishing sync of a table and finding a new table to 
> sync should be minimal. I'm not sure how having an old relid for such a small 
> amount of time can do any harm.

There is no "harm", but it just didn't feel right to make the
LogicalRepWorker to transition through some meaningless state
("available" for re-use but still assigned some relid), just because
it was easy to do it that way. I think it is more natural for the
'relid' to be valid only when it is valid for the worker and to be
InvalidOid when it is not valid. --- Maybe this gripe would become
more apparent if the implementation use the "free-list" idea because
then you would have a lot of bogus relids assigned to the workers of
that list for longer than just a moment.

>
>>
>> 3.
>> Maybe I am mistaken, but it seems the benchmark results posted are
>> only using quite a small/default values for
>> "max_sync_workers_per_subscription", so I wondered how those results
>> are affected by increasing that GUC. I think having only very few
>> workers would cause more sequential processing, so conveniently the
>> effect of the patch avoiding re-launch might be seen in the best
>> possible light. OTOH, using more TSW 

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-05-25 Thread Melih Mutlu
Hi,


Peter Smith , 24 May 2023 Çar, 05:59 tarihinde şunu
yazdı:

> Hi, and thanks for the patch! It is an interesting idea.
>
> I have not yet fully read this thread, so below are only my first
> impressions after looking at patch 0001. Sorry if some of these were
> already discussed earlier.
>
> TBH the patch "reuse-workers" logic seemed more complicated than I had
> imagined it might be.
>

If you mean patch 0001 by the patch "reuse-workers", most of the complexity
comes with some refactoring to split apply worker and tablesync worker
paths. [1]
If you mean the whole patch set, then I believe it's because reusing
replication slots also requires having a proper snapshot each time the
worker moves to a new table. [2]


>
> 1.
> IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it
> finishes dealing with one table then goes looking to find if there is
> some relation that it can process next. So now every TSW has a loop
> where it will fight with every other available TSW over who will get
> to process the next relation.
>
> Somehow this seems all backwards to me. Isn't it strange for the TSW
> to be the one deciding what relation it would deal with next?
>
> IMO it seems more natural to simply return the finished TSW to some
> kind of "pool" of available workers and the main Apply process can
> just grab a TSW from that pool instead of launching a brand new one in
> the existing function process_syncing_tables_for_apply(). Or, maybe
> those "available" workers can be returned to a pool maintained within
> the launcher.c code, which logicalrep_worker_launch() can draw from
> instead of launching a whole new process?
>
> (I need to read the earlier posts to see if these options were already
> discussed and rejected)
>

I think ([3]) relying on a single apply worker for the assignment of
several tablesync workers might bring some overhead, it's possible that
some tablesync workers wait in idle until the apply worker assigns them
something. OTOH yes, the current approach makes tablesync workers race for
a new table to sync.
TBF both ways might be worth discussing/investigating more, before deciding
which way to go.


> 2.
> AFAIK the thing that identifies a  tablesync worker is the fact that
> only TSW will have a 'relid'.
>
> But it feels very awkward to me to have a TSW marked as "available"
> and yet that LogicalRepWorker must still have some OLD relid field
> value lurking (otherwise it will forget that it is a "tablesync"
> worker!).
>
> IMO perhaps it is time now to introduce some enum 'type' to the
> LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid'
> whereas an "available" type=TSW would have relid == InvalidOid.
>

Hmm, relid will be immediately updated when the worker moves to a new
table. And the time between finishing sync of a table and finding a new
table to sync should be minimal. I'm not sure how having an old relid for
such a small amount of time can do any harm.


> 3.
> Maybe I am mistaken, but it seems the benchmark results posted are
> only using quite a small/default values for
> "max_sync_workers_per_subscription", so I wondered how those results
> are affected by increasing that GUC. I think having only very few
> workers would cause more sequential processing, so conveniently the
> effect of the patch avoiding re-launch might be seen in the best
> possible light. OTOH, using more TSW in the first place might reduce
> the overall tablesync time because the subscriber can do more work in
> parallel.



So I'm not quite sure what the goal is here. E.g. if the user doesn't

care much about how long tablesync phase takes then there is maybe no
> need for this patch at all. OTOH, I thought if a user does care about
> the subscription startup time, won't those users be opting for a much
> larger "max_sync_workers_per_subscription" in the first place?
> Therefore shouldn't the benchmarking be using a larger number too?


Regardless of how many tablesync workers there are, reusing workers will
speed things up if a worker has a chance to sync more than one table.
Increasing the number of tablesync workers, of course, improves the
tablesync performance. But if it doesn't make 100% parallel ( meaning that
# of sync workers != # of tables to sync), then reusing workers can bring
an additional improvement.

Here are some benchmarks similar to earlier, but with 100 tables and
different number of workers:

++-+-+-++
|| 2 workers   | 4 workers   | 6 workers   | 8 workers  |
++-+-+-++
| master | 2579.154 ms | 1383.153 ms | 1001.559 ms | 911.758 ms |
++-+-+-++
| patch  | 1724.230 ms | 853.894 ms  | 601.176 ms  | 496.395 ms |
++-+-+-++

So yes, increasing the number of workers makes it faster. But reusing
workers can still improve more.


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-05-23 Thread Peter Smith
Hi, and thanks for the patch! It is an interesting idea.

I have not yet fully read this thread, so below are only my first
impressions after looking at patch 0001. Sorry if some of these were
already discussed earlier.

TBH the patch "reuse-workers" logic seemed more complicated than I had
imagined it might be.

1.
IIUC with patch 0001, each/every tablesync worker (a.k.a. TSW) when it
finishes dealing with one table then goes looking to find if there is
some relation that it can process next. So now every TSW has a loop
where it will fight with every other available TSW over who will get
to process the next relation.

Somehow this seems all backwards to me. Isn't it strange for the TSW
to be the one deciding what relation it would deal with next?

IMO it seems more natural to simply return the finished TSW to some
kind of "pool" of available workers and the main Apply process can
just grab a TSW from that pool instead of launching a brand new one in
the existing function process_syncing_tables_for_apply(). Or, maybe
those "available" workers can be returned to a pool maintained within
the launcher.c code, which logicalrep_worker_launch() can draw from
instead of launching a whole new process?

(I need to read the earlier posts to see if these options were already
discussed and rejected)

~~

2.
AFAIK the thing that identifies a  tablesync worker is the fact that
only TSW will have a 'relid'.

But it feels very awkward to me to have a TSW marked as "available"
and yet that LogicalRepWorker must still have some OLD relid field
value lurking (otherwise it will forget that it is a "tablesync"
worker!).

IMO perhaps it is time now to introduce some enum 'type' to the
LogicalRepWorker. Then an "in_use" type=TSW would have a valid 'relid'
whereas an "available" type=TSW would have relid == InvalidOid.

~~

3.
Maybe I am mistaken, but it seems the benchmark results posted are
only using quite a small/default values for
"max_sync_workers_per_subscription", so I wondered how those results
are affected by increasing that GUC. I think having only very few
workers would cause more sequential processing, so conveniently the
effect of the patch avoiding re-launch might be seen in the best
possible light. OTOH, using more TSW in the first place might reduce
the overall tablesync time because the subscriber can do more work in
parallel.

So I'm not quite sure what the goal is here. E.g. if the user doesn't
care much about how long tablesync phase takes then there is maybe no
need for this patch at all. OTOH, I thought if a user does care about
the subscription startup time, won't those users be opting for a much
larger "max_sync_workers_per_subscription" in the first place?
Therefore shouldn't the benchmarking be using a larger number too?

==

Here are a few other random things noticed while looking at patch 0001:

1. Commit message

1a. typo /sequantially/sequentially/

1b. Saying "killed" and "killing" seemed a bit extreme and implies
somebody else is killing the process. But I think mostly tablesync is
just ending by a normal proc exit, so maybe reword this a bit.

~~~

2. It seemed odd that some -- clearly tablesync specific -- functions
are in the worker.c instead of in tablesync.c.

2a. e.g. clean_sync_worker

2b. e.g. sync_worker_exit

~~~

3. process_syncing_tables_for_sync

+ /*
+ * Sync worker is cleaned at this point. It's ready to sync next table,
+ * if needed.
+ */
+ SpinLockAcquire(>relmutex);
+ MyLogicalRepWorker->ready_to_reuse = true;
  SpinLockRelease(>relmutex);
+ }
+
+ SpinLockRelease(>relmutex);

Isn't there a double release of that mutex happening there?

--
Kind Regards,
Peter Smith.
Fujitsu Australia




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-04-04 Thread Gregory Stark (as CFM)
On Sun, 26 Feb 2023 at 19:11, Melanie Plageman
 wrote:
>
> This is cool! Thanks for working on this.
> I had a chance to review your patchset and I had some thoughts and
> questions.

It looks like this patch got a pretty solid review from Melanie
Plageman in February just before the CF started. It was never set to
Waiting on Author but I think that may be the right state for it.

-- 
Gregory Stark
As Commitfest Manager




Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-02-26 Thread Melanie Plageman
On Wed, Feb 22, 2023 at 8:04 AM Melih Mutlu  wrote:
>
> Hi Wang,
>
> Thanks for reviewing.
> Please see updated patches. [1]

This is cool! Thanks for working on this.
I had a chance to review your patchset and I had some thoughts and
questions.

I notice that you've added a new user-facing option to make a snapshot.
I think functionality to independently make a snapshot for use elsewhere
has been discussed in the past for the implementation of different
features (e.g. [1] pg_dump but they ended up using replication slots for
this I think?), but I'm not quite sure I understand all the implications
for providing a user-visible create snapshot command. Where can it be
used? When can the snapshot be used? In your patch's case, you know that
you can use the snapshot you are creating, but I just wonder if any
restrictions or caveats need be taken for its general use.

For the worker reuse portion of the code, could it be a separate patch
in the set? It could be independently committable and would be easier to
review (separate from repl slot reuse).

Given table sync worker reuse, I think it is worth considering a more
explicit structure for the table sync worker code now -- i.e. having a
TableSyncWorkerMain() function. Though they still do the
LogicalRepApplyLoop(), much of what else they do is different than the
apply leader.

Apply worker leader does:

ApplyWorkerMain()
walrcv_startstreaming()
LogicalRepApplyLoop()
launch table sync workers
walrcv_endstreaming()
proc_exit()

Table Sync workers master:

ApplyWorkerMain()
start_table_sync()
walrcv_create_slot()
copy_table()
walrcv_startstreaming()
start_apply()
LogicalRepApplyLoop()
walrcv_endstreaming()
proc_exit()

Now table sync workers need to loop back and do start_table_sync() again
for their new table.
You have done this in ApplyWorkerMain(). But I think that this could be
a separate main function since their main loop is effectively totally
different now than an apply worker leader.

Something like:

TableSyncWorkerMain()
TableSyncWorkerLoop()
start_table_sync()
walrcv_startstreaming()
LogicalRepApplyLoop()
walrcv_endstreaming()
wait_for_new_rel_assignment()
proc_exit()

You mainly have this structure, but it is a bit hidden and some of the
shared functions that previously may have made sense for table sync
worker and apply workers to share don't really make sense to share
anymore.

The main thing that table sync workers and apply workers share is the
logic in LogicalRepApplyLoop() (table sync workers use when they do
catchup), so perhaps we should make the other code separate?

Also on the topic of worker reuse, I was wondering if having workers
find their own next table assignment (as you have done in
process_syncing_tables_for_sync()) makes sense.

The way the whole system would work now (with your patch applied), as I
understand it, the apply leader would loop through the subscription rel
states and launch workers up to max_sync_workers_per_subscription for
every candidate table needing sync. The apply leader will continue to do
this, even though none of those workers would exit unless they die
unexpectedly. So, once it reaches max_sync_workers_per_subscription, it
won't launch any more workers.

When one of these sync workers is finished with a table (it is synced
and caught up), it will search through the subscription rel states
itself looking for a candidate table to work on.

It seems it would be common for workers to be looking through the
subscription rel states at the same time, and I don't really see how you
prevent races in who is claiming a relation to work on. Though you take
a shared lock on the LogicalRepWorkerLock, what if in between
logicalrep_worker_find() and updating my MyLogicalRepWorker->relid,
someone else also updates their relid to that relid. I don't think you
can update LogicalRepWorker->relid with only a shared lock.

I wonder if it is not better to have the apply leader, in
process_syncing_tables_for_apply(), first check for an existing worker
for the rel, then check for an available worker without an assignment,
then launch a worker?

Workers could then sleep after finishing their assignment and wait for
the leader to give them a new assignment.

Given an exclusive lock on LogicalRepWorkerLock, it may be okay for
workers to find their own table assignments from the subscriptionrel --
and perhaps this will be much more efficient from a CPU perspective. It
feels just a bit weird to have the code doing that buried in
process_syncing_tables_for_sync(). It seems like it should at least
return out to a main table sync worker loop in which workers loop
through finding a table and assigning it to themselves, syncing the
table, and catching the table up.

- Melanie

[1] 

Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-02-22 Thread Melih Mutlu
Hi Wang,

Thanks for reviewing.
Please see updated patches. [1]

wangw.f...@fujitsu.com , 7 Şub 2023 Sal, 10:28
tarihinde şunu yazdı:

> 1. In the function ApplyWorkerMain.
> I think we need to keep the worker name as "leader apply worker" in the
> comment
> like the current HEAD.
>

Done.


> I think in this case we also need to pop the error context stack before
> returning. Otherwise, I think we might use the wrong callback
> (apply error_callback) after we return from this function.
>

Done.


> 3. About the function UpdateSubscriptionRelReplicationSlot.
> This newly introduced function UpdateSubscriptionRelReplicationSlot does
> not
> seem to be invoked. Do we need this function?


Removed.

I think if 'need_full_snapshot' is false, it means we will create a snapshot
> that can read only catalogs. (see SnapBuild->building_full_snapshot)


Fixed.

```
> 'use' will use the snapshot for the current transaction executing the
> command.
> This option must be used in a transaction, and CREATE_REPLICATION_SLOT
> must be
> the first command run in that transaction.
> ```

So I think in the function CreateDecodingContext, when "need_full_snapshot"
> is
> true, we seem to need the following check, just like in the function
> CreateInitDecodingContext:

```
> if (IsTransactionState() &&
> GetTopTransactionIdIfAny() != InvalidTransactionId)
> ereport(ERROR,
> (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
>  errmsg("cannot create logical replication
> slot in transaction that has performed writes")));
> ```


You're right to "use" the snapshot, it must be the first command in the
transaction. And that check happens here [2]. CreateReplicationSnapshot has
also similar check.
I think the check you're referring to is needed to actually create a
replication slot and it performs whether the snapshot will be "used" or
"exported". This is not the case for CreateReplicationSnapshot.

It seems that we also need to invoke the function
> CheckLogicalDecodingRequirements in the new function
> CreateReplicationSnapshot,
> just like the function CreateReplicationSlot and the function
> StartLogicalReplication.


Added this check.

3. The invocation of startup_cb_wrapper in the function
> CreateDecodingContext.
> I think we should change the third input parameter to true when invoke
> function
> startup_cb_wrapper for CREATE_REPLICATION_SNAPSHOT. BTW, after applying
> patch
> v10-0002*, these settings will be inconsistent when sync workers use
> "CREATE_REPLICATION_SLOT" and "CREATE_REPLICATION_SNAPSHOT" to take
> snapshots.
> This input parameter (true) will let us disable streaming and two-phase
> transactions in function pgoutput_startup. See the last paragraph of the
> commit
> message for 4648243 for more details.


I'm not sure if "is_init" should be set to true. CreateDecodingContext only
creates a context for an already existing logical slot and does not
initialize new one.
I think inconsistencies between "CREATE_REPLICATION_SLOT" and
"CREATE_REPLICATION_SNAPSHOT" are expected since one creates a new
replication slot and the other does not.
CreateDecodingContext is also used in other places as well. Not sure how
this change would affect those places. I'll look into this more. Please let
me know if I'm missing something.


[1]
https://www.postgresql.org/message-id/CAGPVpCQmEE8BygXr%3DHi2N2t2kOE%3DPJwofn9TX0J9J4crjoXarQ%40mail.gmail.com
[2]
https://github.com/postgres/postgres/blob/master/src/backend/replication/walsender.c#L1108

Thanks,
-- 
Melih Mutlu
Microsoft


Re: [PATCH] Reuse Workers and Replication Slots during Logical Replication

2023-02-22 Thread Melih Mutlu
Hi Shveta,

Thanks for reviewing.
Please see attached patches.

shveta malik , 2 Şub 2023 Per, 14:31 tarihinde şunu
yazdı:

> On Wed, Feb 1, 2023 at 5:37 PM Melih Mutlu  wrote:
> for (int64 i = 1; i <= lastusedid; i++)
> {
> charoriginname_to_drop[NAMEDATALEN] = {0};
> snprintf(originname_to_drop,
> sizeof(originname_to_drop), "pg_%u_%lld", subid, (long long) i);
>  ...
>   }
>
> --Is it better to use the function
> 'ReplicationOriginNameForLogicalRep' here instead of sprintf, just to
> be consistent everywhere to construct origin-name?
>

ReplicationOriginNameForLogicalRep creates a slot name with current
"lastusedid" and doesn't accept that id as parameter. Here the patch needs
to check all possible id's.


> /* Drop replication origin */
> replorigin_drop_by_name(originname, true, false);
> }
>
> --Are we passing missing_ok as true (second arg) intentionally here in
> replorigin_drop_by_name? Once we fix the issue reported  in my earlier
> email (ASSERT), do you think it makes sense to  pass missing_ok as
> false here?
>

Yes, missing_ok is intentional. The user might be concurrently refreshing
the sub or the apply worker might already drop the origin at that point.
So, missing_ok is set to true.
This is also how origin drops before the worker exits are handled on HEAD
too. I only followed the same approach.


> --Do we need to palloc for each relation separately? Shall we do it
> once outside the loop and reuse it? Also pfree is not done for rstate
> here.
>

Removed palloc from the loop. No need to pfree here since the memory
context will be deleted with the next CommitTransactionCommand call.


> Can you please review the above flow (I have given line# along with),
> I think it could be problematic. We alloced prev_slotname, assigned it
> to slotname, freed prev_slotname and used slotname after freeing the
> prev_slotname.
> Also slotname is allocated some memory too, that is not freed.
>

Right, I used memcpy instead of assigning prev_slotname to slotname.
slotname is returned in the end and pfree'd later [1]

I also addressed your other reviews that I didn't explicitly mention in
this email. I simply applied the changes you pointed out. Also added some
more logs as well. I hope it's more useful now.

[1]
https://github.com/postgres/postgres/blob/master/src/backend/replication/logical/worker.c#L4359


Thanks,
-- 
Melih Mutlu
Microsoft


v8-0001-Add-replication-protocol-cmd-to-create-a-snapsho.patch
Description: Binary data


v11-0002-Reuse-Logical-Replication-Background-worker.patch
Description: Binary data


  1   2   >