RE: Support logical replication of DDLs

2023-04-03 Thread houzj.f...@fujitsu.com
On Friday, March 31, 2023 6:31 AM Peter Smith  wrote:

Hi,

> 
> It seems that lately, the patch attachments are lacking version numbers. It
> causes unnecessary confusion. For example, I sometimes fetch patches from
> this thread locally to "diff" them with previous patches to get a rough 
> overview
> of the changes -- that has now become more difficult.
> 
> Can you please reinstate the name convention of having version numbers for all
> patch attachments?
> 
> IMO *every* post that includes patches should unconditionally increment the
> patch version -- even if the new patches are just a rebase or some other 
> trivial
> change. Version numbers make it clear what patches are the latest, you will be
> easily able to unambiguously refer to them by name in subsequent posts, and
> when copied to your local computer they won't clash with any older copied
> patches.

The patch currently use date as the version number. I think the reason is that
multiple people are working on the patch which cause the version numbers to be
changed very frequently(soon becomes a very large number). So to avoid this
, we used the date to distinguish different versions.

Best Regards,
Hou zj


RE: Non-superuser subscription owners

2023-03-31 Thread houzj.f...@fujitsu.com
On Saturday, April 1, 2023 4:00 AM Robert Haas 

Hi,

> 
> On Thu, Mar 30, 2023 at 9:49 PM houzj.f...@fujitsu.com
>  wrote:
> > It looks like the super user check is out of a transaction, I haven't
> > checked why it only failed on one BF animal, but it seems we can put
> > the check into the transaction like the following:
> 
> That looks like a reasonable fix but I can't reproduce the problem locally. I
> thought the reason why that machine sees the problem might be that it uses
> -DRELCACHE_FORCE_RELEASE, but I tried that option here and the tests still 
> pass.
> Anyone ideas how to reproduce?

I think it's a timing problem because superuser_arg() function will cache the
roleid that passed in last time, so it might not search the syscache to hit the
Assert() check each time. And in the regression test, the roleid cache happened
to be invalidated before the superuser_arg() by some concurrently ROLE change(
maybe in subscription.sql and publication.sql).

I can reproduce it by using gdb and starting another session to change the ROLE.

When the apply worker starts, use the gdb to block the apply worker in the
transaction before the super user check. Then start another session to ALTER
ROLE to invalidate the roleid cache in superuser_arg() which will cause the
apply worker to search the syscache and hit the Assert().

--
origin_startpos = replorigin_session_get_progress(false);
B*  CommitTransactionCommand();

/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
! superuser_arg(MySubscription->owner);
--

Best Regards,
Hou zj


RE: Non-superuser subscription owners

2023-03-30 Thread houzj.f...@fujitsu.com
On Friday, March 31, 2023 12:05 AM Robert Haas  wrote:

Hi,

> 
> On Tue, Mar 28, 2023 at 1:52 PM Jeff Davis  wrote:
> > On Fri, 2023-03-24 at 00:17 -0700, Jeff Davis wrote:
> > > The other patch you posted seems like it makes a lot of progress in
> > > that direction, and I think that should go in first. That was one of
> > > the items I suggested previously[2], so thank you for working on
> > > that.
> >
> > The above is not a hard objection.
> 
> The other patch is starting to go in a direction that is going to have some
> conflicts with this one, so I went ahead and committed this one to avoid
> rebasing pain.

I noticed the BF[1] report a core dump after this commit.

#0  0xfd581864 in _lwp_kill () from /usr/lib/libc.so.12
#0  0xfd581864 in _lwp_kill () from /usr/lib/libc.so.12
#1  0xfd5817dc in raise () from /usr/lib/libc.so.12
#2  0xfd581c88 in abort () from /usr/lib/libc.so.12
#3  0x01e6c8d4 in ExceptionalCondition 
(conditionName=conditionName@entry=0x2007758 "IsTransactionState()", 
fileName=fileName@entry=0x20565c4 "catcache.c", 
lineNumber=lineNumber@entry=1208) at assert.c:66
#4  0x01e4e404 in SearchCatCacheInternal (cache=0xfd21e500, 
nkeys=nkeys@entry=1, v1=v1@entry=28985, v2=v2@entry=0, v3=v3@entry=0, 
v4=v4@entry=0) at catcache.c:1208
#5  0x01e4eea0 in SearchCatCache1 (cache=, v1=v1@entry=28985) at 
catcache.c:1162
#6  0x01e66e34 in SearchSysCache1 (cacheId=cacheId@entry=11, 
key1=key1@entry=28985) at syscache.c:825
#7  0x01e98c40 in superuser_arg (roleid=28985) at superuser.c:70
#8  0x01c657bc in ApplyWorkerMain (main_arg=) at worker.c:4552
#9  0x01c1ceac in StartBackgroundWorker () at bgworker.c:861
#10 0x01c23be0 in do_start_bgworker (rw=) at postmaster.c:5762
#11 maybe_start_bgworkers () at postmaster.c:5986
#12 0x01c2459c in process_pm_pmsignal () at postmaster.c:5149
#13 ServerLoop () at postmaster.c:1770
#14 0x01c26cdc in PostmasterMain (argc=argc@entry=4, 
argv=argv@entry=0xe0e4) at postmaster.c:1463
#15 0x01ee2c8c in main (argc=4, argv=0xe0e4) at main.c:200

It looks like the super user check is out of a transaction, I haven't checked 
why
it only failed on one BF animal, but it seems we can put the check into the
transaction like the following:

diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 6fd674b5d6..08f10fc331 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4545,12 +4545,13 @@ ApplyWorkerMain(Datum main_arg)
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
-   CommitTransactionCommand();
 
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
!superuser_arg(MySubscription->owner);
 
+   CommitTransactionCommand();
+

[1] 
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=mamba=2023-03-30%2019%3A41%3A08

Best Regards,
Hou Zhijie


RE: Simplify some codes in pgoutput

2023-03-29 Thread houzj.f...@fujitsu.com
On Thursday, March 30, 2023 9:15 AM Peter Smith  wrote:
> 
> Hi Hou-san,
> 
> I tried to compare the logic of patch v3-0001 versus the original HEAD code.
> 
> IMO this patch logic is not exactly the same as before -- there are
> some subtle differences. I am not sure if these differences represent
> real problems or not.
> 
> Below are all my review comments:

Thanks for the check and comments.

> 
> ==
> 
> 1.
> /* Switch relation if publishing via root. */
> if (relentry->publish_as_relid != RelationGetRelid(relation))
> {
> Assert(relation->rd_rel->relispartition);
> ancestor = RelationIdGetRelation(relentry->publish_as_relid);
> targetrel = ancestor;
> }
> 
> ~
> 
> The "switch relation if publishing via root" logic is now happening
> first, whereas the original code was doing this after the slot
> assignments. AFAIK it does not matter, it's just a small point of
> difference.

I also think it doesn't matter.

> ==
> 
> 2.
> /* Convert tuple if needed. */
> if (relentry-> attrmap)
> {
> ...
> }
> 
> The "Convert tuple if needed." logic looks the same, but when it is
> executed is NOT the same. It could be a problem.
> 
> Previously, the conversion would only happen within the "Switch
> relation if publishing via root." condition. But the patch no longer
> has that extra condition -- now I think it attempts conversion every
> time regardless of "publishing via root".
> 
> I would expect the "publish via root" case to be less common, so even
> if the current code works, by omitting that check won't this patch
> have an unnecessary performance hit due to the extra conversions?

No, the conversions won't happen in normal cases because "if (relentry-> 
attrmap)"
will pass only if we need to switch relation(publish via root).

> ~~
> 
> 3.
> if (old_slot)
> old_slot =
> execute_attr_map_slot(relentry->attrmap,old_slot,MakeTupleTableSlot(tupde
> sc,
> ));
> 
> ~
> 
> The previous conversion code for UPDATE (shown above) was checking
> "if (old_slot)". Actually, I don't know why that check was even
> necessary before but it seems to have been accounting for a
> possibility that UPDATE might not have "oldtuple".

If the RI key wasn't updated, then it's possible the old tuple is null.

> 
> But this combination (if indeed it was possible) is not handled
> anymore with the patch code because the old_slot is unconditionally
> assigned in the same block doing this conversion.

I think this case is handled by the generic old slot conversion in the patch.

> ==
> 
> 4.
> AFAIK, the "if (change->data.tp.newtuple)" can only be true for INSERT
> or UPDATE, so the code would be better to include a sanity Assert.
> 
> SUGGESTION
> if (change->data.tp.newtuple)
> {
> Assert(action == REORDER_BUFFER_CHANGE_INSERT || action ==
> REORDER_BUFFER_CHANGE_UPDATE);
> ...
> }
> 
> ==
> 
> 5.
> AFAIK, the "if (change->data.tp.oldtuple)" can only be true for UPDATE
> or DELETE, so the code would be better to include a sanity Assert.
> 
> SUGGESTION
> if (change->data.tp.oldtuple)
> {
> Assert(action == REORDER_BUFFER_CHANGE_UPDATE || action ==
> REORDER_BUFFER_CHANGE_DELETE);
> ...
> 

It might be fine but I am not sure if it's necessary to add this in this
patch as we don't have such assertion before.

> 
> ==
> 
> 6.
> I suggest moving the "change->data.tp.oldtuple" check before the
> "change->data.tp.newtuple" check. I don't think it makes any
> difference, but it seems more natural IMO to have old before new.

Changed.

Best Regards,
Hou zj



v4-0001-simplify-the-code-in-pgoutput_change.patch
Description: v4-0001-simplify-the-code-in-pgoutput_change.patch


RE: Support logical replication of DDLs

2023-03-28 Thread houzj.f...@fujitsu.com
On Tuesday, March 28, 2023 1:41 PM Amit Kapila  wrote:
> 
> On Mon, Mar 27, 2023 at 5:37 PM Amit Kapila 
> wrote:
> >
> > On Mon, Mar 27, 2023 at 12:07 PM Amit Kapila 
> wrote:
> > >
> > > On Mon, Mar 27, 2023 at 2:52 AM Tom Lane  wrote:
> > > >
> > >
> > > > I suggest taking a couple of steps back from the minutiae of the
> > > > patch, and spending some hard effort thinking about how the thing
> > > > would be controlled in a useful fashion (that is, a real design
> > > > for the filtering that was mentioned at the very outset), and
> > > > about the security issues, and about how we could get to a committable
> patch.
> > > >
> > >
> > > Agreed. I'll try to summarize the discussion we have till now on
> > > this and share my thoughts on the same in a separate email.
> > >
> >
> > The idea to control what could be replicated is to introduce a new
> > publication option 'ddl' along with current options 'publish' and
> > 'publish_via_partition_root'. The values of this new option could be
> > 'table', 'function', 'all', etc. Here 'all' enables the replication of
> > all supported DDL commands. Example usage for this would be:
> > Example:
> > Create a new publication with all ddl replication enabled:
> >   CREATE PUBLICATION pub1 FOR ALL TABLES with (ddl = 'all');
> >
> > Enable table ddl replication for an existing Publication:
> >   ALTER PUBLICATION pub2 SET (ddl = 'table');
> >
> > This is what seems to have been discussed but I think we can even
> > extend it to support based on operations/commands, say one would like
> > to publish only 'create' and 'drop' of tables. Then we can extend the
> > existing publish option to have values like 'create', 'alter', and
> > 'drop'.
> >
> 
> The other idea could be to that for the new option ddl, we input command tags
> such that the replication will happen for those commands.
> For example, ALTER PUBLICATION pub2 SET (ddl = 'Create Table, Alter
> Table, ..'); This will obviate the need to have additional values like 
> 'create', 'alter',
> and 'drop' for publish option.
> 
> The other thought related to filtering is that one might want to filter DDLs 
> and
> or DMLs performed by specific roles in the future. So, we then need to
> introduce another option ddl_role, or something like that.
> 
> Can we think of some other kind of filter for DDL replication?

I am thinking another generic syntax for ddl replication like:

--
CREATE PUBLICATION pubname FOR object_type object_name with (publish = 
'ddl_type');
--

To replicate DDLs that happened on a table, we don't need to add new syntax or
option, we can extend the value for the 'publish' option like:

To support more non-table objects replication, we can follow the same style and 
write it like:
--
CRAETE PUBLICATION FOR FUNCTION f1 with (publish = 'alter'); -- function
CRAETE PUBLICATION FOR ALL OPERATORS IN SCHEMA op_schema with (publish = 
'drop'); -- operators
CRAETE PUBLICATION FOR ALL OBJECTS with (publish = 'alter, create, drop'); -- 
everything
--

In this approach, we extend the publication grammar and users can
filter the object schema, object name, object type and ddltype. We can also add
more options to filter role or other infos in the future.



One more alternative could be like:

One more alternative could be like:
CREATE PUBLICATION xx FOR pub_create_alter_table WITH (ddl = 
'table:create,alter'); -- it will publish create table and alter table 
operations.
CREATE PUBLICATION xx FOR pub_all_table WITH (ddl = 'table:all'); -- This means 
all table operations create/alter/drop
CREATE PUBLICATION xx FOR pub_all_table WITH (ddl = 'table'); -- same as above

This can be extended later to:
CREATE PUBLICATION xx FOR pub_all_func WITH (ddl = 'function:all');
CREATE PUBLICATION xx FOR pub_create_trigger (ddl = 'trigger:create');

In this approach, we don't need to add more stuff in gram.y and
will give fine-grained control as well.

Thanks for Vignesh for sharing this idea off-list.

Best Regards,
Hou zj




RE: Support logical replication of DDLs

2023-03-27 Thread houzj.f...@fujitsu.com
On Monday, March 27, 2023 8:08 PM Amit Kapila  wrote:
> On Mon, Mar 27, 2023 at 12:07 PM Amit Kapila 
> wrote:
> >
> > On Mon, Mar 27, 2023 at 2:52 AM Tom Lane  wrote:
> > >
> >
> > > I suggest taking a couple of steps back from the minutiae of the
> > > patch, and spending some hard effort thinking about how the thing
> > > would be controlled in a useful fashion (that is, a real design for
> > > the filtering that was mentioned at the very outset), and about the
> > > security issues, and about how we could get to a committable patch.
> > >
> >
> > Agreed. I'll try to summarize the discussion we have till now on this
> > and share my thoughts on the same in a separate email.
> >
> 
> The idea to control what could be replicated is to introduce a new publication
> option 'ddl' along with current options 'publish' and
> 'publish_via_partition_root'. The values of this new option could be 'table',
> 'function', 'all', etc. Here 'all' enables the replication of all supported 
> DDL
> commands. Example usage for this would be:
> Example:
> Create a new publication with all ddl replication enabled:
>   CREATE PUBLICATION pub1 FOR ALL TABLES with (ddl = 'all');
> 
> Enable table ddl replication for an existing Publication:
>   ALTER PUBLICATION pub2 SET (ddl = 'table');
> 
> This is what seems to have been discussed but I think we can even extend it to
> support based on operations/commands, say one would like to publish only
> 'create' and 'drop' of tables. Then we can extend the existing publish option 
> to
> have values like 'create', 'alter', and 'drop'.
> 
> Another thing we are considering related to this is at what level these
> additional options should be specified. We have three variants FOR TABLE, FOR
> ALL TABLES, and FOR TABLES IN SCHEMA that enables replication. Now, for the
> sake of simplicity, this new option is discussed to be provided only with FOR
> ALL TABLES variant but I think we can provide it with other variants with some
> additional restrictions like with FOR TABLE, we can only specify 'alter' and
> 'drop' for publish option. Now, though possible, it brings additional
> complexity to support it with variants other than FOR ALL TABLES because then
> we need to ensure additional filtering and possible modification of the 
> content
> we have to send to downstream. So, we can even decide to first support it only
> FOR ALL TABLES variant.
> 
> The other point to consider for publish option 'ddl = table' is whether we 
> need
> to allow replicating dependent objects like say some user-defined type is used
> in the table. I guess the difficulty here would be to identify which 
> dependents
> we want to allow.
> 
> I think in the first version we should allow to replicate only some of the 
> objects
> instead of everything. For example, can we consider only allowing tables and
> indexes in the first version? Then extend it in a phased manner?

I think supporting table related stuff in the first version makes sense and the
patch size could be reduced to a suitable size. I also checked other DBs design
for reference, the IBM DB2's DDL replication functionality[1] is similar to what
is proposed here(e.g. only replicate table related DDL: TABLE/INDEX/KEY ..). We
can extend it to support other non-table objects in the following patch set.

[1] 
https://www.ibm.com/docs/en/idr/11.4.0?topic=dr-how-q-capture-handles-ddl-operations-source-database

Best Regards,
Hou zj


RE: Initial Schema Sync for Logical Replication

2023-03-24 Thread houzj.f...@fujitsu.com
On Friday, March 24, 2023 11:01 PM Euler Taveira   wrote:

Hi,

> On Fri, Mar 24, 2023, at 8:57 AM, mailto:houzj.f...@fujitsu.com wrote:
> > First, I think the current publisher doesn't know the version number of
> > client(subscriber) so we need to check the feasibility of same. Also, having
> > client's version number checks doesn't seem to be a good idea.
> 
> walrcv_server_version().

I don't think this function works, as it only shows the server version (e.g.
publisher/walsender).

> > Besides, I thought about the problems that will happen if we try to support
> > replicating New PG to older PG. The following examples assume that we 
> > support the
> > DDL replication in the mentioned PG.
> > 
> > 1) Assume we want to replicate from a newer PG to a older PG where partition
> >table has not been introduced. I think even if the publisher is aware of
> >that, it doesn't have a good way to transform the partition related 
> > command,
> >maybe one could say we can transform that to inherit table, but I feel 
> > that
> >introduces too much complexity.
> > 
> > 2) Another example is generated column. To replicate the newer PG which has
> >this feature to a older PG without this. I am concerned that is there a 
> > way
> >to transform this without causing inconsistent behavior.
> > 
> > Even if we decide to simply skip sending such unsupported commands or skip
> > applying them, then it's likely that the following dml replication will 
> > cause
> > data inconsistency.
>
> As I mentioned in a previous email [1], the publisher can contain code to
> decide if it can proceed or not, in case you are doing a downgrade. I said
> downgrade but it can also happen if we decide to deprecate a syntax. For
> example, when WITH OIDS was deprecated, pg_dump treats it as an acceptable
> removal. The transformation can be (dis)allowed by the protocol version or
> another constant [2].

If most of the new DDL related features won't be supported to be transformed to
old subscriber, I don't see a point in supporting this use case.

I think cases like the removal of WITH OIDS are rare enough that we don't need
to worry about and it doesn't affect the data consistency. But new DDL features
are different.

Not only the features like partition or generated column, features like
nulls_not_distinct are also tricky to be transformed without causing
inconsistent behavior.

> > So, it seems we cannot completely support this use case, there would be some
> > limitations. Personally, I am not sure if it's worth introducing complexity 
> > to
> > support it partially.
> 
> Limitations are fine; they have different versions. I wouldn't like to forbid
> downgrade just because I don't want to maintain compatibility with previous
> versions. IMO it is important to be able to downgrade in case of any
> incompatibility with an application. You might argue that this isn't possible
> due to time or patch size and that there is a workaround for it but I wouldn't
> want to close the door for downgrade in the future.

The biggest problem is the data inconsistency that it would cause. I am not
aware of a generic solution to replicate new introduced DDLs to old subscriber.
which wouldn't cause data inconsistency. And apart from that, IMO the
complexity and maintainability of the feature also matters.

Best  Regards,
Hou zj


RE: Initial Schema Sync for Logical Replication

2023-03-24 Thread houzj.f...@fujitsu.com
On Friday, March 24, 2023 12:02 AM Euler Taveira   wrote:
> 
> On Thu, Mar 23, 2023, at 8:44 AM, Amit Kapila wrote:
> > On Thu, Mar 23, 2023 at 2:48 AM Euler Taveira  
> > wrote:
> > >
> > > On Tue, Mar 21, 2023, at 8:18 AM, Amit Kapila wrote:
> > >
> > > Now, how do we avoid these problems even if we have our own version of
> > > functionality similar to pg_dump for selected objects? I guess we will
> > > face similar problems. If so, we may need to deny schema sync in any
> > > such case.
> > >
> > > There are 2 approaches for initial DDL synchronization:
> > >
> > > 1) generate the DDL command on the publisher, stream it and apply it 
> > > as-is on
> > > the subscriber;
> > > 2) generate a DDL representation (JSON, for example) on the publisher, 
> > > stream
> > > it, transform it into a DDL command on subscriber and apply it.
> > >
> > > The option (1) is simpler and faster than option (2) because it does not
> > > require an additional step (transformation). However, option (2) is more
> > > flexible than option (1) because it allow you to create a DDL command 
> > > even if a
> > > feature was removed from the subscriber and the publisher version is less 
> > > than
> > > the subscriber version or a feature was added to the publisher and the
> > > publisher version is greater than the subscriber version.
> > >
> > 
> > Is this practically possible? Say the publisher has a higher version
> > that has introduced a new object type corresponding to which it has
> > either a new catalog or some new columns in the existing catalog. Now,
> > I don't think the older version of the subscriber can modify the
> > command received from the publisher so that the same can be applied to
> > the subscriber because it won't have any knowledge of the new feature.
> > In the other case where the subscriber is of a newer version, we
> > anyway should be able to support it with pg_dump as there doesn't
> > appear to be any restriction with that, am, I missing something?
> I think so (with some limitations). Since the publisher knows the subscriber
> version, publisher knows that the subscriber does not contain the new object
> type then publisher can decide if this case is critical (and reject the
> replication) or optional (and silently not include the feature X -- because it
> is not essential for logical replication). If required, the transformation
> should be done on the publisher.

I am not if it's feasible to support the use case the replicate DDL to old
subscriber.

First, I think the current publisher doesn't know the version number of
client(subscriber) so we need to check the feasibility of same. Also, having
client's version number checks doesn't seem to be a good idea.

Besides, I thought about the problems that will happen if we try to support
replicating New PG to older PG. The following examples assume that we support 
the
DDL replication in the mentioned PG.

1) Assume we want to replicate from a newer PG to a older PG where partition
   table has not been introduced. I think even if the publisher is aware of
   that, it doesn't have a good way to transform the partition related command,
   maybe one could say we can transform that to inherit table, but I feel that
   introduces too much complexity.

2) Another example is generated column. To replicate the newer PG which has
   this feature to a older PG without this. I am concerned that is there a way
   to transform this without causing inconsistent behavior.

Even if we decide to simply skip sending such unsupported commands or skip
applying them, then it's likely that the following dml replication will cause
data inconsistency.

So, it seems we cannot completely support this use case, there would be some
limitations. Personally, I am not sure if it's worth introducing complexity to
support it partially.

Best Regards,
Hou zj


RE: Simplify some codes in pgoutput

2023-03-22 Thread houzj.f...@fujitsu.com
On Monday, March 20, 2023 5:20  pmhouzj.f...@fujitsu.com wrote:
> 
> On Thursday, March 16, 2023 12:30 PM Amit Kapila 
> wrote:
> 
> >
> > On Wed, Mar 15, 2023 at 2:00 PM houzj.f...@fujitsu.com
> >  wrote:
> > >
> > > I noticed that there are some duplicated codes in pgoutput_change()
> > function
> > > which can be simplified, and here is an attempt to do that.
> > >
> >
> > For REORDER_BUFFER_CHANGE_DELETE, when the old tuple is missing, after
> > this patch, we will still send BEGIN and do OutputPluginWrite, etc.
> > Also, it will try to perform row_filter when none of old_slot or
> > new_slot is set. I don't know for which particular case we have s
> > handling missing old tuples for deletes but that might require changes
> > in your proposed patch.
> 
> I researched this a bit. I think the old tuple will be null only if the 
> modified table
> doesn't have PK or RI when the DELETE happens (referred to the heap_delete()),
> but in that case the DELETE won't be allowed to be replicated(e.g. the DELETE
> will either error out or be filtered by table level filter in 
> pgoutput_change).
> 
> I also checked this for system table and in that case it is null but 
> reorderbuffer
> doesn't forward it. For user_catalog_table, similarily, the DELETE should be
> filtered by table filter in pgoutput_change as well.
> 
> So, I think we can remove this check and log.
> And here is the new version patch which removes that for now.

After rethinking about this, it seems better leave this check for now. Although
it may be unnecessary, but we can remove that later as a separate patch when we
are sure about this. So, here is a patch that add this check back.

Best Regards,
Hou zj



v3-0001-simplify-the-code-in-pgoutput_change.patch
Description: v3-0001-simplify-the-code-in-pgoutput_change.patch


RE: Initial Schema Sync for Logical Replication

2023-03-22 Thread houzj.f...@fujitsu.com
On Wednesday, March 22, 2023 1:16 PM Amit Kapila  
wrote:
> 
> On Wed, Mar 22, 2023 at 8:29 AM Masahiko Sawada
>  wrote:
> >
> > On Tue, Mar 21, 2023 at 8:18 PM Amit Kapila 
> wrote:
> > >
> > > On Tue, Mar 21, 2023 at 7:32 AM Euler Taveira  wrote:
> > >
> > > > You should
> > > > exclude them removing these objects from the TOC before running
> > > > pg_restore or adding a few pg_dump options to exclude these
> > > > objects. Another issue is related to different version. Let's say
> > > > the publisher has a version ahead of the subscriber version, a new
> > > > table syntax can easily break your logical replication setup. IMO
> > > > pg_dump doesn't seem like a good solution for initial synchronization.
> > > >
> > > > Instead, the backend should provide infrastructure to obtain the
> > > > required DDL commands for the specific (set of) tables. This can
> > > > work around the issues from the previous paragraph:
> > > >
> > > ...
> > > > * don't need to worry about different versions.
> > > >
> > >
> > > AFAICU some of the reasons why pg_dump is not allowed to dump from
> > > the newer version are as follows: (a) there could be more columns in
> > > the newer version of the system catalog and then Select * type of
> > > stuff won't work because the client won't have knowledge of
> > > additional columns. (b) the newer version could have new features
> > > (represented by say new columns in existing catalogs or new
> > > catalogs) that the older version of pg_dump has no knowledge of and
> > > will fail to get that data and hence an inconsistent dump. The
> > > subscriber will easily be not in sync due to that.
> > >
> > > Now, how do we avoid these problems even if we have our own version
> > > of functionality similar to pg_dump for selected objects? I guess we
> > > will face similar problems.
> >
> > Right. I think that such functionality needs to return DDL commands
> > that can be executed on the requested version.
> >
> > > If so, we may need to deny schema sync in any such case.
> >
> > Yes. Do we have any concrete use case where the subscriber is an older
> > version, in the first place?
> >
> 
> As per my understanding, it is mostly due to the reason that it can work 
> today.
> Today, during an off-list discussion with Jonathan on this point, he pointed 
> me
> to a similar incompatibility in MySQL replication. See the "SQL
> incompatibilities" section in doc[1]. Also, please note that this applies not 
> only
> to initial sync but also to schema sync during replication. I don't think it 
> would
> be feasible to keep such cross-version compatibility for DDL replication.
> 
> Having said above, I don't intend that we must use pg_dump from the
> subscriber for the purpose of initial sync. I think the idea at this stage is 
> to
> primarily write a POC patch to see what difficulties we may face. The other
> options that we could try out are (a) try to duplicate parts of pg_dump code 
> in
> some way (by extracting required
> code) for the subscription's initial sync, or (b) have a common code (probably
> as a library or some other way) for the required functionality. There could be
> more possibilities that we may not have thought of yet. But the main point is
> that for approaches other than using pg_dump, we should consider ways to
> avoid duplicity of various parts of its code. Due to this, I think before 
> ruling out
> using pg_dump, we should be clear about its risks and limitations.

I thought about some possible problems about the design of using pg_dump.

1) According to the design, it will internally call pg_dump when creating
subscription, but it requires to use a powerful user when calling pg_dump.
Currently, it may not be a problem because create subscription also requires
superuser. But people have recently discussed about allowing non-superuser to
create the subscription[1], if that is accepted, then it seems not great to
internally use superuser to call pg_dump while the user creating the
subscription is a non-super user.

2) I think it's possible that some cloud DB service doesn't allow user to use
the client commands(pg_dump ,..) directly, and the user that login in the
database may not have the permission to execute the client commands.

[1] 
https://www.postgresql.org/message-id/flat/20230308194743.23rmgjgwahh4i4rg%40awork3.anarazel.de

Best Regards,
Hou zj



RE: Simplify some codes in pgoutput

2023-03-20 Thread houzj.f...@fujitsu.com
On Friday, March 17, 2023 11:49 AM Peter Smith  wrote:
> 
> On Wed, Mar 15, 2023 at 7:30 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > Hi,
> >
> > I noticed that there are some duplicated codes in pgoutput_change()
> > function which can be simplified, and here is an attempt to do that.
> 
> Hi Hou-san.
> 
> I had a quick look at the 0001 patch.
> 
> Here are some first comments.

Thanks for the comments.

> 
> ==
> 
> 1.
> + if (relentry->attrmap)
> + old_slot = execute_attr_map_slot(relentry->attrmap, old_slot,
> + MakeTupleTableSlot(RelationGetDescr(targetrel),
> + ));
> 
> 1a.
> IMO maybe it was more readable before when there was a separate 'tupdesc'
> variable, instead of trying to squeeze too much into one statement.
> 
> 1b.
> Should you retain the old comments that said "/* Convert tuple if needed. */"

Added.

> ~~~
> 
> 2.
> - if (old_slot)
> - old_slot = execute_attr_map_slot(relentry->attrmap,
> - old_slot,
> - MakeTupleTableSlot(tupdesc, ));
> 
> The original code for REORDER_BUFFER_CHANGE_UPDATE was checking "if
> (old_slot)" but that check seems no longer present. Is it OK?

I think the logic is the same.

> 
> ~~~
> 
> 3.
> - /*
> - * Send BEGIN if we haven't yet.
> - *
> - * We send the BEGIN message after ensuring that we will actually
> - * send the change. This avoids sending a pair of BEGIN/COMMIT
> - * messages for empty transactions.
> - */
> 
> That original longer comment has been replaced with just "/* Send BEGIN if we
> haven't yet */". Won't it be better to retain the more informative longer
> comment?

Added.

> ~~~
> 
> 4.
> +
> +cleanup:
>   if (RelationIsValid(ancestor))
>   {
>   RelationClose(ancestor);
> 
> ~
> 
> Since you've introduced a new label 'cleanup:' then IMO you can remove that
> old comment "/* Cleanup */".
> 
Removed.

Best Regards,
Hou zj


RE: Simplify some codes in pgoutput

2023-03-20 Thread houzj.f...@fujitsu.com
On Thursday, March 16, 2023 12:30 PM Amit Kapila  
wrote:

> 
> On Wed, Mar 15, 2023 at 2:00 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > I noticed that there are some duplicated codes in pgoutput_change()
> function
> > which can be simplified, and here is an attempt to do that.
> >
> 
> For REORDER_BUFFER_CHANGE_DELETE, when the old tuple is missing, after
> this patch, we will still send BEGIN and do OutputPluginWrite, etc.
> Also, it will try to perform row_filter when none of old_slot or
> new_slot is set. I don't know for which particular case we have s
> handling missing old tuples for deletes but that might require changes
> in your proposed patch.

I researched this a bit. I think the old tuple will be null only if the
modified table doesn't have PK or RI when the DELETE happens (referred to
the heap_delete()), but in that case the DELETE won't be allowed to be
replicated(e.g. the DELETE will either error out or be filtered by table level
filter in pgoutput_change).

I also checked this for system table and in that case it is null but
reorderbuffer doesn't forward it. For user_catalog_table, similarily, the
DELETE should be filtered by table filter in pgoutput_change as well.

So, I think we can remove this check and log.
And here is the new version patch which removes that for now.

Best Regards,
Hou zj


v2-0001-simplify-the-code-in-pgoutput_change.patch
Description: v2-0001-simplify-the-code-in-pgoutput_change.patch


RE: Allow logical replication to copy tables in binary format

2023-03-15 Thread houzj.f...@fujitsu.com
Hi,

Thanks for updating the patch, I think it is a useful feature.

I looked at the v15 patch and the patch looks mostly good to me.
Here are few comments:

1.
+   {
+   appendStringInfo(, " WITH (FORMAT binary)");

We could use appendStringInfoString here.


2.
+# It should fail
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? no binary input 
function available for type/);
...
+# Cannot sync due to type mismatch
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data 
format/);
...
+# Ensure the COPY command is executed in text format on the publisher
+$node_publisher->wait_for_log(qr/LOG: ( [a-z0-9]+:)? COPY (.+)? TO STDOUT\n/);

I think it would be better to pass the log offset when using wait_for_log,
because otherwise it will check the whole log file to find the target message,
This might not be a big problem, but it has a risk of getting unexpected log 
message
which was generated by previous commands.

Best Regards,
Hou zj


Simplify some codes in pgoutput

2023-03-15 Thread houzj.f...@fujitsu.com
Hi,

I noticed that there are some duplicated codes in pgoutput_change() function
which can be simplified, and here is an attempt to do that.

Best Regards,
Hou Zhijie



0001-simplify-the-code-in-pgoutput_change.patch
Description: 0001-simplify-the-code-in-pgoutput_change.patch


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2023-03-13 Thread houzj.f...@fujitsu.com
On Monday, March 13, 2023 2:23 PM Önder Kalacı   wrote:
Hi,

> > >
> > >
> > > Reading [1], I think I can follow what you suggest. So, basically,
> > > if the leftmost column is not filtered, we have the following:
> > >
> > >>  but the entire index would have to be scanned, so in most cases the 
> > >> planner
> > would prefer a sequential table scan over using the index.
> > >
> > >
> > > So, in our case, we could follow a similar approach. If the leftmost 
> > > column of
> > the index
> > > is not sent over the wire from the pub, we can prefer the sequential scan.
> > >
> > > Is my understanding of your suggestion accurate?
> > >
> > 
> > Yes. I request an opinion from Shi-San who has reported the problem.
> > 
> 
> I also agree with this.
> And I think we can mention this in the comments if we do so.
> 
> Already commented on FindUsableIndexForReplicaIdentityFull() on v44.

Thanks for updating the patch.

I noticed one problem:

+static bool
+RemoteRelContainsLeftMostColumnOnIdx(IndexInfo  *indexInfo,
+
LogicalRepRelation  *remoterel)
+{
+   AttrNumber  keycol;
+
+   if (indexInfo->ii_NumIndexAttrs < 1)
+   return false;
+
+   keycol = indexInfo->ii_IndexAttrNumbers[0];
+   if (!AttributeNumberIsValid(keycol))
+   return false;
+
+   return bms_is_member(keycol-1, remoterel->attkeys);
+}

In this function, it used the local column number(keycol) to match the remote
column number(attkeys), I think it will cause problem if the column order
between pub/sub doesn't match. Like:

---
- pub
CREATE TABLE test_replica_id_full (x int, y int);
ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;
CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full;
- sub
CREATE TABLE test_replica_id_full (z int, y int, x int);
CREATE unique INDEX idx ON test_replica_id_full(z);
CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION 'dbname=postgres port=5432' 
PUBLICATION tap_pub_rep_full;
---

I think we need to use the attrmap->attnums to convert the column number before
comparing. Just for reference, attach a diff(0001) which I noted down when 
trying to
fix the problem.

Besides, I also look at the "WIP: Optimize for non-pkey / non-RI unique
indexes" patch, I think it also had a similar problem about the column
matching. And another thing I think we can improved in this WIP patch is that
we can cache the result of IsIdxSafeToSkipDuplicates() instead of doing it for
each UPDATE, because the cost of this function becomes bigger after applying
this patch. And for reference, I tried to improve the WIP for the same, and
here is a slight modified version of this WIP(0002). Feel free to modify or 
merge
it if needed.
Thanks for Shi-san for helping to finish these fixes.


Best Regards,
Hou zj
From 3e7f94ff0492a98d2d7970146d3a9a1a43cecd92 Mon Sep 17 00:00:00 2001
From: Hou Zhijie 
Date: Mon, 13 Mar 2023 17:36:55 +0800
Subject: [PATCH 2/2] WIP: Optimize for non-pkey / non-RI unique indexes

---
 src/backend/executor/execReplication.c |   7 +-
 src/backend/replication/logical/relation.c | 150 +
 src/backend/replication/logical/worker.c   |  40 --
 src/include/executor/executor.h|   2 +
 src/include/replication/logicalrelation.h  |   4 +-
 5 files changed, 158 insertions(+), 45 deletions(-)

diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index cd17be0681..ba19ef2bf8 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -135,6 +135,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, 
Relation idxrel,
  */
 bool
 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
+bool 
isIdxSafeToSkipDuplicates,
 LockTupleMode lockmode,
 TupleTableSlot 
*searchslot,
 TupleTableSlot 
*outslot)
@@ -147,13 +148,10 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
Relationidxrel;
boolfound;
TypeCacheEntry **eq = NULL;
-   boolisIdxSafeToSkipDuplicates;
 
/* Open the index. */
idxrel = index_open(idxoid, RowExclusiveLock);
 
-   isIdxSafeToSkipDuplicates = IsIdxSafeToSkipDuplicates(rel, idxoid);
-
InitDirtySnapshot(snap);
 
/* Build scan key. */
@@ -171,8 +169,7 @@ retry:
while (index_getnext_slot(scan, ForwardScanDirection, outslot))
{
/*
-* Avoid expensive equality check if the index is primary key or
-* replica identity index.
+* Avoid expensive equality check if the index is unique.
 */
if (!isIdxSafeToSkipDuplicates)
  

RE: Support logical replication of DDLs

2023-03-09 Thread houzj.f...@fujitsu.com
On Thur, Mar 9, 2023 10:27 AM Wang, Wei/王 威 
> On Mon, Mar 6, 2023 18:17 PM Wang, Wei/王 威 
> wrote:
> > On Mon, Mar 6, 2023 14:34 AM Ajin Cherian  wrote:
> > > Changes are in patch 1 and patch 2
> >
> > Thanks for updating the patch set.
> >
> > Here are some comments:
> 
> Here are some more comments for v-75-0002* patch:

Thanks for your comments.

> 1. In the function deparse_AlterRelation
> + if ((sub->address.objectId != relId &&
> +  sub->address.objectId != InvalidOid) &&
> + !(subcmd->subtype == AT_AddConstraint &&
> +   subcmd->recurse) &&
> + istable)
> + continue;
> I think when we execute the command "ALTER TABLE ... CLUSTER ON" 
> (subtype is AT_ClusterOn), this command will be skipped for parsing. I 
> think we need to parse this command here.
> 
> I think we are skipping some needed parsing due to this check, such as 
> [1].#1 and the AT_ClusterOn command mentioned above. After reading the 
> thread, I think the purpose of this check is to fix the bug in [2] 
> (see the last point in [2]).
> I think maybe we could modify this check to `continue` when
> sub->address.objectId and relId are inconsistent and 
> sub->sub->address.objectId is a
> child (inherited or partition) table. What do you think?

Fixed as suggested.

> ~~~
> 
> 2. In the function deparse_CreateStmt
> I think when we execute the following command:
> `CREATE TABLE tbl (a int GENERATED ALWAYS AS (1) STORED);` the 
> deparsed result is :
> `CREATE  TABLE  public.tbl (a pg_catalog.int4 STORAGE plain 
> GENERATED ALWAYS AS 1 STORED);` I think the parentheses around 
> generation_expr(I mean `1`) are missing, which would cause a syntax 
> error.

Fixed.

> ~~~
> 
> 3. In the function deparse_IndexStmt
> I think we missed parsing of options [NULLS NOT DISTINCT] in the 
> following
> command:
> ```
> CREATE UNIQUE INDEX ... ON table_name ... NULLS NOT DISTINCT; ``` I 
> think we could check this option via node->nulls_not_distinct.

Fixed.

Best Regards,
Hou zj


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2023-03-07 Thread houzj.f...@fujitsu.com
On Wednesday, March 8, 2023 2:51 PM houzj.f...@fujitsu.com 
 wrote:
> 
> On Tuesday, March 7, 2023 9:47 PM Önder Kalacı 
> wrote:
> 
> Hi,
> 
> > > > Let me give an example to demonstrate why I thought something is fishy
> here:
> > > >
> > > > Imagine rel has a (non-default) REPLICA IDENTITY with Oid=.
> > > > Imagine the same rel has a PRIMARY KEY with Oid=.
> > > >
> >
> > Hmm, alright, this is syntactically possible, but not sure if any user
> > would do this. Still thanks for catching this.
> >
> > And, you are right, if a user has created such a schema,
> > IdxIsRelationIdentityOrPK() would return the wrong result and we'd use
> sequential scan instead of index scan.
> > This would be a regression. I think we should change the function.
> 
> I am looking at the latest patch and have a question about the following code.
> 
>   /* Try to find the tuple */
> - if (index_getnext_slot(scan, ForwardScanDirection, outslot))
> + while (index_getnext_slot(scan, ForwardScanDirection, outslot))
>   {
> - found = true;
> + /*
> +  * Avoid expensive equality check if the index is primary key or
> +  * replica identity index.
> +  */
> + if (!idxIsRelationIdentityOrPK)
> + {
> + if (eq == NULL)
> + {
> +#ifdef USE_ASSERT_CHECKING
> + /* apply assertions only once for the input
> idxoid */
> + IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
> +
>   Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
> +#endif
> +
> + /*
> +  * We only need to allocate once. This is
> allocated within per
> +  * tuple context -- ApplyMessageContext --
> hence no need to
> +  * explicitly pfree().
> +  */
> + eq = palloc0(sizeof(*eq) *
> outslot->tts_tupleDescriptor->natts);
> + }
> +
> + if (!tuples_equal(outslot, searchslot, eq))
> + continue;
> + }
> 
> IIRC, it invokes tuples_equal for all cases unless we are using replica 
> identity key
> or primary key to scan. But there seem some other cases where the
> tuples_equal looks unnecessary.
> 
> For example, if the table on subscriber don't have a PK or RI key but have a
> not-null, non-deferrable, unique key. And if the apply worker choose this 
> index
> to do the scan, it seems we can skip the tuples_equal as well.
> 
> --Example
> pub:
> CREATE TABLE test_replica_id_full (a int, b int not null); ALTER TABLE
> test_replica_id_full REPLICA IDENTITY FULL; CREATE PUBLICATION
> tap_pub_rep_full FOR TABLE test_replica_id_full;
> 
> sub:
> CREATE TABLE test_replica_id_full (a int, b int not null); CREATE UNIQUE INDEX
> test_replica_id_full_idx ON test_replica_id_full(b);

Thinking again. This example is incorrect, sorry. I mean the case when
all the columns of the tuple to be compared are in the unique index on
subscriber side, like:

--Example
pub:
CREATE TABLE test_replica_id_full (a int);
ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;
CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full;

sub:
CREATE TABLE test_replica_id_full (a int not null);
CREATE UNIQUE INDEX test_replica_id_full_idx ON test_replica_id_full(a);
--

Best Regards,
Hou zj


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2023-03-07 Thread houzj.f...@fujitsu.com
On Tuesday, March 7, 2023 9:47 PM Önder Kalacı   wrote:

Hi,

> > > Let me give an example to demonstrate why I thought something is fishy 
> > > here:
> > >
> > > Imagine rel has a (non-default) REPLICA IDENTITY with Oid=.
> > > Imagine the same rel has a PRIMARY KEY with Oid=.
> > >
> 
> Hmm, alright, this is syntactically possible, but not sure if any user 
> would do this. Still thanks for catching this.
> 
> And, you are right, if a user has created such a schema, 
> IdxIsRelationIdentityOrPK() 
> would return the wrong result and we'd use sequential scan instead of index 
> scan. 
> This would be a regression. I think we should change the function. 

I am looking at the latest patch and have a question about the following code.

/* Try to find the tuple */
-   if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+   while (index_getnext_slot(scan, ForwardScanDirection, outslot))
{
-   found = true;
+   /*
+* Avoid expensive equality check if the index is primary key or
+* replica identity index.
+*/
+   if (!idxIsRelationIdentityOrPK)
+   {
+   if (eq == NULL)
+   {
+#ifdef USE_ASSERT_CHECKING
+   /* apply assertions only once for the input 
idxoid */
+   IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
+   
Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
+#endif
+
+   /*
+* We only need to allocate once. This is 
allocated within per
+* tuple context -- ApplyMessageContext -- 
hence no need to
+* explicitly pfree().
+*/
+   eq = palloc0(sizeof(*eq) * 
outslot->tts_tupleDescriptor->natts);
+   }
+
+   if (!tuples_equal(outslot, searchslot, eq))
+   continue;
+   }

IIRC, it invokes tuples_equal for all cases unless we are using replica
identity key or primary key to scan. But there seem some other cases where the
tuples_equal looks unnecessary.

For example, if the table on subscriber don't have a PK or RI key but have a
not-null, non-deferrable, unique key. And if the apply worker choose this index
to do the scan, it seems we can skip the tuples_equal as well.

--Example
pub:
CREATE TABLE test_replica_id_full (a int, b int not null);
ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;
CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full;

sub:
CREATE TABLE test_replica_id_full (a int, b int not null);
CREATE UNIQUE INDEX test_replica_id_full_idx ON test_replica_id_full(b);
--

I am not 100% sure if it's worth optimizing this by complicating the check in
idxIsRelationIdentityOrPK. What do you think ?

Best Regards,
Hou zj


RE: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full on the publisher

2023-03-02 Thread houzj.f...@fujitsu.com
On Thursday, March 2, 2023 11:23 PM Önder Kalacı   wrote:

> Both the patches are numbered 0001. It would be better to number them
> as 0001 and 0002.
> 
> Alright, attached v27_0001_use_index_on_subs_when_pub_rep_ident_full.patch 
> and 
> v27_0002_use_index_on_subs_when_pub_rep_ident_full.patch.
> 
> I also added one more test which Andres asked me on a private chat
> (Testcase start: SUBSCRIPTION USES INDEX WITH PUB/SUB different data).

Thanks for updating the patch. I think this patch can bring noticeable
performance improvements in some use cases.

And here are few comments after reading the patch.

1.
+   usableIndexContext = AllocSetContextCreate(CurrentMemoryContext,
+   
   "usableIndexContext",
+   
   ALLOCSET_DEFAULT_SIZES);
+   oldctx = MemoryContextSwitchTo(usableIndexContext);
+
+   /* Get index list of the local relation */
+   indexlist = RelationGetIndexList(localrel);
+   Assert(indexlist != NIL);
+
+   foreach(lc, indexlist)

Is it necessary to create a memory context here ? I thought the memory will be
freed after this apply action soon.

2.

+   /*
+* Furthermore, because primary key and unique key 
indexes can't
+* include expressions we also sanity check the index 
is neither
+* of those kinds.
+*/
+   Assert(!IdxIsRelationIdentityOrPK(rel, idxrel->rd_id));

It seems you mean "replica identity key" instead of "unique key" in the 
comments.


3.
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
...
+extern bool IsIndexOnlyOnExpression(IndexInfo *indexInfo);

The definition function seems better to be placed in execReplication.c

4.

+extern Oid GetRelationIdentityOrPK(Relation rel);

The function is only used in relation.c, so we can make it a static
function.


5.

+   /*
+* If index scans are disabled, use a sequential scan.
+*
+* Note that we do not use index scans below when enable_indexscan is
+* false. Allowing primary key or replica identity even when index scan 
is
+* disabled is the legacy behaviour. So we hesitate to move the below
+* enable_indexscan check to be done earlier in this function.
+*/
+   if (!enable_indexscan)
+   return InvalidOid;

Since the document of enable_indexscan says "Enables or disables the query
planner's use of index-scan plan types. The default is on.", and we don't use
planner here, so I am not sure should we allow/disallow index scan in apply
worker based on this GUC.

Best Regards,
Hou zj



RE: Rework LogicalOutputPluginWriterUpdateProgress

2023-03-02 Thread houzj.f...@fujitsu.com
On Friday, March 3, 2023 8:18 AM Peter Smith  wrote:
> On Wed, Mar 1, 2023 at 9:16 PM wangw.f...@fujitsu.com
>  wrote:
> >
> > On Tues, Feb 28, 2023 at 9:12 AM Peter Smith 
> wrote:
> > > Here are some comments for the v2-0001 patch.
> > >
> > > (I haven't looked at the v3 that was posted overnight; maybe some of
> > > my comments have already been addressed.)
> >
> > Thanks for your comments.
> >
> > > ==
> > > General
> > >
> > > 1. (Info from the commit message)
> > > Since we can know whether the change is an end of transaction change
> > > in the common code, we removed the
> LogicalDecodingContext->end_xact
> > > introduced in commit f95d53e.
> > >
> > > ~
> > >
> > > TBH, it was not clear to me that this change was an improvement.
> > > IIUC, it removes the "unnecessary" member, but only does that by
> > > replacing it everywhere with a boolean parameter passed to
> > > update_progress_and_keepalive(). So the end result seems no less
> > > code, but it is less readable code now because you need to know what
> > > the true/false parameter means. I wonder if it would have been
> > > better just to leave this how it was.
> >
> > Since I think we can know the meaning of the input based on the
> > parameter name of the function, I think both approaches are fine. But
> > the approach in the current patch can reduce a member of the
> > structure, so I think this modification looks good to me.
> >
> 
...
> 
> Anyway, I think this exposes another problem. If you still want the patch to 
> pass
> the 'finshed_xact' parameter separately then AFAICT the first parameter (ctx)
> now becomes unused/redundant in the WalSndUpdateProgressAndKeepalive
> function, so it ought to be removed.
> 

I am not sure about this. The first parameter (ctx) has been introduced since
the Lag tracking feature. I think this is to make it consistent with other
LogicalOutputPluginWriter callbacks. In addition, this is a public callback
function and user can implement their own logic in this callbacks based on
interface, removing this existing parameter doesn't look great to me. Although
this patch also removes the existing skipped_xact, but it's because we decide
to use another parameter did_write which can play a similar role.

Best Regards,
Hou zj


RE: Support logical replication of DDLs

2023-02-19 Thread houzj.f...@fujitsu.com
On Wed, Feb 15, 2023 at 13:57 PM Amit Kapila  wrote:
> On Fri, Feb 10, 2023 at 8:23 PM Masahiko Sawada 
> 
> wrote:
> >
> > On Thu, Feb 9, 2023 at 6:55 PM Ajin Cherian  wrote:
> > >
> > (v67)
> >
> > I have some questions about adding the infrastructure for DDL deparsing.
> >
> > Apart from the changes made by 0001 patch to add infrastructure for 
> > DDL deparsing, 0002 patch seems to add some variables that are not 
> > used in 0002 patch:
> >
> > @@ -2055,6 +2055,7 @@ typedef struct AlterTableStmt
> > List   *cmds;   /* list of subcommands */
> > ObjectType  objtype;/* type of object */
> > boolmissing_ok; /* skip error if table
> > missing */
> > +   booltable_like; /* internally generated for
> > TableLikeClause */
> >  } AlterTableStmt;
> >
> > @@ -39,6 +40,7 @@ typedef struct CollectedATSubcmd  {
> > ObjectAddress address;  /* affected column,
> > constraint, index, ... */
> > Node   *parsetree;
> > +   char   *usingexpr;
> >  } CollectedATSubcmd;
> >
> >  typedef struct CollectedCommand
> > @@ -62,6 +64,7 @@ typedef struct CollectedCommand
> > {
> > Oid objectId;
> > Oid classId;
> > +   boolrewrite;
> > List   *subcmds;
> > }   alterTable;
> >
> > These three variables are used in 0006 patch.
> >
> 
> Hmm, then it should be better to move these to 0006 patch.

Makes sense. Because the variables "table_like" and "usingexpr" are used in 
0002 patch,
so I only moved "rewrite" to 0006 patch.

Best Regards,
Hou zj


RE: Support logical replication of DDLs

2023-02-16 Thread houzj.f...@fujitsu.com
On Wednesday, February 15, 2023 5:51 PM Amit Kapila  
wrote:
> 
> On Wed, Feb 15, 2023 at 2:02 PM Alvaro Herrera 
> wrote:
> >
> > On 2023-Feb-15, Peter Smith wrote:
> >
> > > On Thu, Feb 9, 2023 at 8:55 PM Ajin Cherian  wrote:
> > > >
> > > > On Fri, Feb 3, 2023 at 11:41 AM Peter Smith 
> wrote:
> >
> > > > > 3. ExecuteGrantStmt
> > > > >
> > > > > + /* Copy the grantor id needed for DDL deparsing of Grant */
> > > > > + istmt.grantor_uid = grantor;
> > > > >
> > > > > SUGGESTION (comment)
> > > > > Copy the grantor id to the parsetree, needed for DDL deparsing
> > > > > of Grant
> > > >
> > > > didn't change this, as Alvaro said this was not a parsetree.
> > >
> > > Perhaps there is more to do here? Sorry, I did not understand the
> > > details of Alvaro's post [1], but I did not recognize the difference
> > > between ExecuteGrantStmt and ExecSecLabelStmt so it was my
> > > impression either one or both of these places are either wrongly
> > > commented, or maybe are doing something that should not be done.
> >
> > These two cases are different.  In ExecGrantStmt we're adding the
> > grantor OID to the InternalGrant struct, which is not a parse node, so
> > there's no strong reason not to modify it (and also the suggested
> > comment change is factually wrong).  I don't know if the idea is
> > great, but at least I see no strong objection.
> >
> > In the other case, as I said in [1], the patch proposes to edit the
> > parse node to add the grantor, but I think a better idea might be to
> > change the signature to ExecSecLabelStmt(SecLabelStmt *stmt,
> > ObjectAddress *provider) so that the function can set the provider
> > there; and caller passes , which is the method we
> > adopted for this kind of thing.
> >
> 
> +1, that is a better approach to make the required change in
> ExecSecLabelStmt().

I did some research for this.

The provider seems not a database object, user needs to register a provider via
C ode via register_label_provider. And ObjectAddress only have three
fields(classId, objectId, objectSubId), so it seems hard to set the provider 
with name to
a ObjectAddress. We cannot get the correct provider name from the catalog as
well because there could be multiple providers for the same db object.

So, if we don't want to modify the parsetree. Maybe we can introduce a function
GetDefaultProvider() which can be used if user doesn't specify the provider in
the DDL command. GetDefaultProvider will return the first provider in the
providers list as is done in ExecSecLabelStmt(). What do you think ?

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-02-14 Thread houzj.f...@fujitsu.com
On Wednesday, February 15, 2023 10:34 AM Amit Kapila  
wrote:
> 
> On Tue, Feb 14, 2023 at 7:45 PM Masahiko Sawada 
> wrote:
> >
> > On Tue, Feb 14, 2023 at 3:58 PM Peter Smith 
> wrote:
> > >
> > > On Tue, Feb 14, 2023 at 5:04 PM Amit Kapila 
> wrote:
> > > >
> > > > On Fri, Feb 10, 2023 at 8:56 AM Peter Smith 
> wrote:
> > > > >
> > > > > My first impression was the
> > > > > WAIT_EVENT_LOGICAL_PARALLEL_APPLY_SEND_DATA name seemed
> > > > > misleading because that makes it sound like the parallel apply
> > > > > worker is doing the sending, but IIUC it's really the opposite.
> > > > >
> > > >
> > > > So, how about WAIT_EVENT_LOGICAL_APPLY_SEND_DATA?
> > > >
> > >
> > > Yes, IIUC all the LR events are named WAIT_EVENT_LOGICAL_xxx.
> > >
> > > So names like the below seem correct format:
> > >
> > > a) WAIT_EVENT_LOGICAL_APPLY_SEND_DATA
> > > b) WAIT_EVENT_LOGICAL_LEADER_SEND_DATA
> > > c) WAIT_EVENT_LOGICAL_LEADER_APPLY_SEND_DATA
> >
> > Personally I'm fine even without "LEADER" in the wait event name since
> > we don't have "who is waiting" in it. IIUC a row of pg_stat_activity
> > shows who, and the wait event name shows "what the process is
> > waiting". So I prefer (a).
> >
> 
> This logic makes sense to me. So, let's go with (a).

OK, here is patch that change the event name to 
WAIT_EVENT_LOGICAL_APPLY_SEND_DATA.

Best Regard,
Hou zj


v2-0001-Add-a-new-wait-state-and-use-it-when-sending-data.patch
Description:  v2-0001-Add-a-new-wait-state-and-use-it-when-sending-data.patch


RE: Support logical replication of DDLs

2023-02-14 Thread houzj.f...@fujitsu.com
On Tuesday, February 14, 2023 9:44 AM Peter Smith  wrote:
> 
> FYI - the latest patch cannot be applied.
> 

Thanks for reporting. I will post a rebased patch after fixing some of the
comments raised so far(in a day or so).

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-02-09 Thread houzj.f...@fujitsu.com
On Tuesday, February 7, 2023 11:17 AM Amit Kapila  
wrote:
> 
> On Mon, Feb 6, 2023 at 3:43 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > while reading the code, I noticed that in pa_send_data() we set wait
> > event to WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE while
> sending
> > the message to the queue. Because this state is used in multiple
> > places, user might not be able to distinguish what they are waiting
> > for. So It seems we'd better to use WAIT_EVENT_MQ_SEND here which will
> > be eaier to distinguish and understand. Here is a tiny patch for that.
> >

As discussed[1], we'd better invent a new state for this purpose, so here is 
the patch
that does the same.

[1] 
https://www.postgresql.org/message-id/CAA4eK1LTud4FLRbS0QqdZ-pjSxwfFLHC1Dx%3D6Q7nyROCvvPSfw%40mail.gmail.com

Best Regards,
Hou zj


0001-Add-new-wait-event-to-be-used-in-apply-worker.patch
Description: 0001-Add-new-wait-event-to-be-used-in-apply-worker.patch


RE: Deadlock between logrep apply worker and tablesync worker

2023-02-06 Thread houzj.f...@fujitsu.com
On Tuesday, February 7, 2023 12:12 PM Peter Smith  wrote:
> On Fri, Feb 3, 2023 at 6:58 PM houzj.f...@fujitsu.com 
> wrote:
> >
> ...
> > > Right, I think that case could be addressed by Tom's patch to some
> > > extent but I am thinking we should also try to analyze if we can
> > > completely avoid the need to remove origins from both processes. One
> > > idea could be to introduce another relstate something like
> > > PRE_SYNCDONE and set it in a separate transaction before we set the
> > > state as SYNCDONE and remove the slot and origin in tablesync worker.
> > > Now, if the tablesync worker errors out due to some reason during
> > > the second transaction, it can remove the slot and origin after restart by
> checking the state.
> > > However, it would add another relstate which may not be the best way
> > > to address this problem. Anyway, that can be accomplished as a separate
> patch.
> >
> > Here is an attempt to achieve the same.
> > Basically, the patch removes the code that drop the origin in apply
> > worker. And add a new state PRE_SYNCDONE after synchronization
> > finished in front of apply (sublsn set), but before dropping the
> > origin and other final cleanups. The tablesync will restart and redo
> > the cleanup if it failed after reaching the new state. Besides, since
> > the changes can already be applied on the table in PRE_SYNCDONE state,
> > so I also modified the check in should_apply_changes_for_rel(). And
> > some other conditions for the origin drop in subscription commands are
> were adjusted in this patch.
> >
> 
> Here are some review comments for the 0001 patch
> 
> ==
> General Comment
> 
> 0.
> The idea of using the extra relstate for clean-up seems OK, but the
> implementation of the new state in this patch appears misordered and
> misnamed to me.
> 
> The state name should indicate what it is doing (PRE_SYNCDONE is
> meaningless). The patch describes in several places that this state means
> "synchronized, but not yet cleaned up" therefore IMO it means the SYNCDONE
> state should be *before* this new state. And since this new state is for
> "cleanup" then let's call it something like that.
> 
> To summarize, I don’t think the meaning of SYNCDONE should be touched.
> SYNCDONE means the synchronization is done, same as before. And your new
> "cleanup" state belongs directly *after* that. IMO it should be like this:
> 
> 1. STATE_INIT
> 2. STATE_DATASYNC
> 3. STATE_FINISHEDCOPY
> 4. STATE_SYNCDONE
> 5. STATE_CLEANUP <-- new relstate
> 6. STATE_READY
> 
> Of course, this is going to impact almost every aspect of the patch, but I 
> think
> everything will be basically the same as you have it now
> -- only all the state names and comments need to be adjusted according to the
> above.

Although I agree the CLEANUP is easier to understand, but I am a bit concerned
that the changes would be a bit invasive.

If we add a CLEANUP state at the end as suggested, it will change the meaning
of the existing SYNCDONE state, before the change it means both data sync and
cleanup have been done, but after the change it only mean the data sync is
over. This also means all the current C codes that considered the SYNCDONE as
the final state of table sync will need to be changed. Moreover, it's common
for user to query the relation state from pg_subscription_rel to identify if
the table sync of a table is finished(e.g. check relstate IN ('r', 's')), but
if we add a new state(CLEANUP) as the final state, then all these SQLs would
need to be changed as they need to check like relstate IN ('r', 'x'(new cleanup
state)).

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-02-06 Thread houzj.f...@fujitsu.com
On Monday, February 6, 2023 6:34 PM Kuroda, Hayato  
wrote:
> > while reading the code, I noticed that in pa_send_data() we set wait
> > event to WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE while
> sending
> > the message to the queue. Because this state is used in multiple
> > places, user might not be able to distinguish what they are waiting
> > for. So It seems we'd better to use WAIT_EVENT_MQ_SEND here which will
> > be eaier to distinguish and understand. Here is a tiny patch for that.
> 
> In LogicalParallelApplyLoop(), we introduced the new wait event
> WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN whereas it is practically waits a
> shared message queue and it seems to be same as WAIT_EVENT_MQ_RECEIVE.
> Do you have a policy to reuse the event instead of adding a new event?

I think PARALLEL_APPLY_MAIN waits for two kinds of event: 1) wait for new
message from the queue 2) wait for the partial file state to be set. So, I
think introducing a new general event for them is better and it is also
consistent with the WAIT_EVENT_LOGICAL_APPLY_MAIN which is used in the main
loop of leader apply worker(LogicalRepApplyLoop). But the event in
pg_send_data() is only for message send, so it seems fine to use
WAIT_EVENT_MQ_SEND, besides MQ_SEND is also unique in parallel apply worker and
user can distinglish without adding new event.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-02-06 Thread houzj.f...@fujitsu.com
Hi, 

while reading the code, I noticed that in pa_send_data() we set wait event
to WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE while sending the
message to the queue. Because this state is used in multiple places, user might
not be able to distinguish what they are waiting for. So It seems we'd better
to use WAIT_EVENT_MQ_SEND here which will be eaier to distinguish and
understand. Here is a tiny patch for that.

Best Regards,
Hou zj


0001-Use-appropriate-wait-event-when-sending-data.patch
Description: 0001-Use-appropriate-wait-event-when-sending-data.patch


RE: Deadlock between logrep apply worker and tablesync worker

2023-02-02 Thread houzj.f...@fujitsu.com
On Thursday, February 2, 2023 7:21 PM Amit Kapila  
wrote:
> 
> On Thu, Feb 2, 2023 at 12:05 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Tuesday, January 31, 2023 1:07 AM vignesh C 
> wrote:
> > > On Mon, 30 Jan 2023 at 17:30, vignesh C  wrote:
> > >
> >
> > I also tried to test the time of "src/test/subscription/t/002_types.pl"
> > before and after the patch(change the lock level) and Tom's
> > patch(split
> > transaction) like what Vignesh has shared on -hackers.
> >
> > I run about 100 times for each case. Tom's and the lock level patch
> > behave similarly on my machines[1].
> >
> > HEAD: 3426 ~ 6425 ms
> > HEAD + Tom: 3404 ~ 3462 ms
> > HEAD + Vignesh: 3419 ~ 3474 ms
> > HEAD + Tom + Vignesh: 3408 ~ 3454 ms
> >
> > Even apart from the testing time reduction, reducing the lock level
> > and lock the specific object can also help improve the lock contention
> > which user(that use the exposed function) , table sync worker and
> > apply worker can also benefit from it. So, I think pushing the patch to 
> > change
> the lock level makes sense.
> >
> > And the patch looks good to me.
> >
> 
> Thanks for the tests. I also see a reduction in test time variability with 
> Vignesh's
> patch. I think we can release the locks in case the origin is concurrently
> dropped as in the attached patch. I am planning to commit this patch
> tomorrow unless there are more comments or objections.
> 
> > While on it, after pushing the patch, I think there is another case
> > might also worth to be improved, that is the table sync and apply
> > worker try to drop the same origin which might cause some delay. This
> > is another case(different from the deadlock), so I feel we can try to 
> > improve
> this in another patch.
> >
> 
> Right, I think that case could be addressed by Tom's patch to some extent but
> I am thinking we should also try to analyze if we can completely avoid the 
> need
> to remove origins from both processes. One idea could be to introduce
> another relstate something like PRE_SYNCDONE and set it in a separate
> transaction before we set the state as SYNCDONE and remove the slot and
> origin in tablesync worker.
> Now, if the tablesync worker errors out due to some reason during the second
> transaction, it can remove the slot and origin after restart by checking the 
> state.
> However, it would add another relstate which may not be the best way to
> address this problem. Anyway, that can be accomplished as a separate patch.

Here is an attempt to achieve the same.
Basically, the patch removes the code that drop the origin in apply worker. And
add a new state PRE_SYNCDONE after synchronization finished in front of apply
(sublsn set), but before dropping the origin and other final cleanups. The
tablesync will restart and redo the cleanup if it failed after reaching the new
state. Besides, since the changes can already be applied on the table in
PRE_SYNCDONE state, so I also modified the check in
should_apply_changes_for_rel(). And some other conditions for the origin drop
in subscription commands are were adjusted in this patch.

Best Regards,
Hou zj


0001-Avoid-dropping-origins-from-both-apply-and-tablesync.patch
Description:  0001-Avoid-dropping-origins-from-both-apply-and-tablesync.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-02-02 Thread houzj.f...@fujitsu.com
On Friday, February 3, 2023 11:04 AM Amit Kapila  
wrote:
> 
> On Thu, Feb 2, 2023 at 4:52 AM Peter Smith 
> wrote:
> >
> > Some minor review comments for v91-0001
> >
> 
> Pushed this yesterday after addressing your comments!

Thanks for pushing.

Currently, we have two remaining patches which we are not sure whether it's 
worth
committing for now. Just share them here for reference.

0001:

Based on our discussion[1] on -hackers, it's not clear that if it's necessary
to add the sub-feature to stop extra worker when
max_apply_workers_per_suibscription is reduced. Because:

- it's not clear whether reducing the 'max_apply_workers_per_suibscription' is 
very
  common.
- even when the GUC is reduced, at that point in time all the workers might be
  in use so there may be nothing that can be immediately done.
- IIUC the excess workers (for a reduced GUC) are going to get freed naturally
  anyway over time as more transactions are completed so the pool size will
  reduce accordingly.

And given that the logic of this patch is simple, it would be easy to add this
at a later point if we really see a use case for this.

0002:

Since all the deadlock errors and other errors that caused by parallel streaming
will be logged and user can check this kind of ERROR and disable the parallel
streaming mode to resolve this. Besides, for this retry feature, It will
be hard to distinguish whether the ERROR is caused by parallel streaming, and we
might need to retry in serialize mode for all kinds of ERROR. So, it's not very
clear if automatic use serialize mode to retry in case of any ERROR in parallel
streaming is necessary or not. And we can also add this when we see a use case.

[1] 
https://www.postgresql.org/message-id/CAA4eK1LotEuPsteuJMNpixxTj6R4B8k93q-6ruRmDzCxKzMNpA%40mail.gmail.com

Best Regards,
Hou zj


v92-0001-Stop-extra-worker-if-GUC-was-changed.patch
Description: v92-0001-Stop-extra-worker-if-GUC-was-changed.patch


v92-0002-Retry-to-apply-streaming-xact-only-in-apply-work.patch
Description:  v92-0002-Retry-to-apply-streaming-xact-only-in-apply-work.patch


RE: Deadlock between logrep apply worker and tablesync worker

2023-02-01 Thread houzj.f...@fujitsu.com
On Tuesday, January 31, 2023 1:07 AM vignesh C  wrote:
> On Mon, 30 Jan 2023 at 17:30, vignesh C  wrote:
> >
> > On Mon, 30 Jan 2023 at 13:00, houzj.f...@fujitsu.com
> >  wrote:
> > >
> > > On Monday, January 30, 2023 2:32 PM Amit Kapila
>  wrote:
> > > >
> > > > On Mon, Jan 30, 2023 at 9:20 AM vignesh C 
> wrote:
> > > > >
> > > > > On Sat, 28 Jan 2023 at 11:26, Amit Kapila 
> wrote:
> > > > > >
> > > > > > One thing that looks a bit odd is that we will anyway have a
> > > > > > similar check in replorigin_drop_guts() which is a static
> > > > > > function and called from only one place, so, will it be required to
> check at both places?
> > > > >
> > > > > There is a possibility that the initial check to verify if
> > > > > replication origin exists in replorigin_drop_by_name was
> > > > > successful but later one of either table sync worker or apply
> > > > > worker process might have dropped the replication origin,
> > > > >
> > > >
> > > > Won't locking on the particular origin prevent concurrent drops?
> > > > IIUC, the drop happens after the patch acquires the lock on the origin.
> > >
> > > Yes, I think the existence check in replorigin_drop_guts is
> > > unnecessary as we already lock the origin before that. I think the
> > > check in replorigin_drop_guts is a custom check after calling
> > > SearchSysCache1 to get the tuple, but the error should not happen as no
> concurrent drop can be performed.
> > >
> > > To make it simpler, one idea is to move the code that getting the
> > > tuple from system cache to the replorigin_drop_by_name(). After
> > > locking the origin, we can try to get the tuple and do the existence
> > > check, and we can reuse this tuple to perform origin delete. In this
> > > approach we only need to check origin existence once after locking.
> > > BTW, if we do this, then we'd better rename the
> > > replorigin_drop_guts() to something like replorigin_state_clear() as
> > > the function only clear the in-memory information after that.
> > >
> >
> > The attached updated patch has the changes to handle the same.
> 
> I had not merged one of the local changes that was present, please find the
> updated patch including that change. Sorry for missing that change.
> 

I also tried to test the time of "src/test/subscription/t/002_types.pl"
before and after the patch(change the lock level) and Tom's patch(split
transaction) like what Vignesh has shared on -hackers.

I run about 100 times for each case. Tom's and the lock level patch
behave similarly on my machines[1].

HEAD: 3426 ~ 6425 ms
HEAD + Tom: 3404 ~ 3462 ms
HEAD + Vignesh: 3419 ~ 3474 ms
HEAD + Tom + Vignesh: 3408 ~ 3454 ms

Even apart from the testing time reduction, reducing the lock level and lock
the specific object can also help improve the lock contention which user(that
use the exposed function) , table sync worker and apply worker can also benefit
from it. So, I think pushing the patch to change the lock level makes sense.

And the patch looks good to me.

While on it, after pushing the patch, I think there is another case might also
worth to be improved, that is the table sync and apply worker try to drop the
same origin which might cause some delay. This is another case(different from
the deadlock), so I feel we can try to improve this in another patch.

[1] CentOS 8.2, 128G RAM, 40 processors Intel(R) Xeon(R) Silver 4210 CPU @ 
2.20GHz

Best Regards,
Hou zj


RE: pub/sub - specifying optional parameters without values.

2023-01-31 Thread houzj.f...@fujitsu.com
On Tuesday, January 31, 2023 10:49 PM Tom Lane  wrote:

Hi,

> Amit Kapila  writes:
> > On Tue, Jan 31, 2023 at 4:25 AM Tom Lane  wrote:
> >> Hmph.  I generally think that options defined like this (it's a
> >> boolean, except it isn't) are a bad idea, and would prefer to see
> >> that API rethought while we still can.
> 
> > We have discussed this during development and considered using a
> > separate option like parallel = on (or say parallel_workers = n) but
> > there were challenges with the same. See discussion in email [1]. We
> > also checked that we have various other places using something similar
> > for options. For example COPY commands option: HEADER [ boolean |
> > MATCH ].
> 
> Yeah, and it's bad experiences with the existing cases that make me not want 
> to
> add more.  Every one of those was somebody taking the easy way out.  It
> generally leads to parsing oddities, such as not accepting all the same 
> spellings
> of "boolean" as before.

I understand the worry of parsing oddities. I think we have tried to make the
streaming option keep accepting all the same spellings of boolean(e.g. the 
option still
accept(1/0/true/false...)). And this is similar to some other option like COPY
HEADER option which accepts all the boolean value and the 'match' value. Some
other GUCs like wal_compression also behave similarly:
0/1/true/false/on/off/lz1/pglz are all valid values.

Best Regards,
Hou zj





RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-30 Thread houzj.f...@fujitsu.com
On Tuesday, January 31, 2023 8:23 AM Peter Smith  wrote:
> 
> On Mon, Jan 30, 2023 at 5:23 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Monday, January 30, 2023 12:13 PM Peter Smith
>  wrote:
> > >
> > > Here are my review comments for v88-0002.
> >
> > Thanks for your comments.
> >
> > >
> > > ==
> > > General
> > >
> > > 1.
> > > The test cases are checking the log content but they are not
> > > checking for debug logs or untranslated elogs -- they are expecting
> > > a normal ereport LOG that might be translated. I’m not sure if that is 
> > > OK, or
> if it is a potential problem.
> >
> > We have tests that check the ereport ERROR and ereport WARNING
> > message(by search for the ERROR or WARNING keyword for all the tap
> > tests), so I think checking the LOG should be fine.
> >
> > > ==
> > > doc/src/sgml/config.sgml
> > >
> > > 2.
> > > On the publisher side, logical_replication_mode allows allows
> > > streaming or serializing changes immediately in logical decoding.
> > > When set to immediate, stream each change if streaming option (see
> > > optional parameters set by CREATE SUBSCRIPTION) is enabled,
> > > otherwise, serialize each change. When set to buffered, the decoding
> > > will stream or serialize changes when logical_decoding_work_mem is
> reached.
> > >
> > > 2a.
> > > typo "allows allows"  (Kuroda-san reported same)
> > >
> > > 2b.
> > > "if streaming option" --> "if the streaming option"
> >
> > Changed.
> 
> Although you replied "Changed" for the above, AFAICT my review comment
> #2b. was accidentally missed.

Fixed.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-30 Thread houzj.f...@fujitsu.com
On Monday, January 30, 2023 10:20 PM Masahiko Sawada  
wrote:
> 
> 
> I have one comment on v89 patch:
> 
> +   /*
> +* Using 'immediate' mode returns false to cause a switch to
> +* PARTIAL_SERIALIZE mode so that the remaining changes will
> be serialized.
> +*/
> +   if (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE)
> +   return false;
> +
> 
> Probably we might want to add unlikely() here since we could pass through this
> path very frequently?

I think your comment makes sense, thanks.
I updated the patch for the same.

Best Regards,
Hou zj


v90-0001-Extend-the-logical_replication_mode-to-test-the-.patch
Description:  v90-0001-Extend-the-logical_replication_mode-to-test-the-.patch


RE: Deadlock between logrep apply worker and tablesync worker

2023-01-29 Thread houzj.f...@fujitsu.com
On Monday, January 30, 2023 2:32 PM Amit Kapila  wrote:
> 
> On Mon, Jan 30, 2023 at 9:20 AM vignesh C  wrote:
> >
> > On Sat, 28 Jan 2023 at 11:26, Amit Kapila  wrote:
> > >
> > > One thing that looks a bit odd is that we will anyway have a similar
> > > check in replorigin_drop_guts() which is a static function and
> > > called from only one place, so, will it be required to check at both 
> > > places?
> >
> > There is a possibility that the initial check to verify if replication
> > origin exists in replorigin_drop_by_name was successful but later one
> > of either table sync worker or apply worker process might have dropped
> > the replication origin,
> >
> 
> Won't locking on the particular origin prevent concurrent drops? IIUC, the
> drop happens after the patch acquires the lock on the origin.

Yes, I think the existence check in replorigin_drop_guts is unnecessary as we
already lock the origin before that. I think the check in replorigin_drop_guts
is a custom check after calling SearchSysCache1 to get the tuple, but the error
should not happen as no concurrent drop can be performed.

To make it simpler, one idea is to move the code that getting the tuple from
system cache to the replorigin_drop_by_name(). After locking the origin, we
can try to get the tuple and do the existence check, and we can reuse
this tuple to perform origin delete. In this approach we only need to check
origin existence once after locking. BTW, if we do this, then we'd better 
rename the
replorigin_drop_guts() to something like replorigin_state_clear() as the 
function
only clear the in-memory information after that.

The code could be like:

---
replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
...
/*
 * Lock the origin to prevent concurrent drops. We keep the lock until 
the
 * end of transaction.
 */
LockSharedObject(ReplicationOriginRelationId, roident, 0,
 AccessExclusiveLock);

tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
if (!HeapTupleIsValid(tuple))
{
if (!missing_ok)
elog(ERROR, "cache lookup failed for replication origin 
with ID %d",
 roident);

return;
}

replorigin_state_clear(rel, roident, nowait);

/*
 * Now, we can delete the catalog entry.
 */
CatalogTupleDelete(rel, >t_self);
ReleaseSysCache(tuple);

CommandCounterIncrement();
...
---

Best Regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-29 Thread houzj.f...@fujitsu.com
On Thursday, January 26, 2023 11:37 AM Kuroda, Hayato/黒田 隼人 
 wrote:
> 
> Followings are comments.

Thanks for the comments.

> In this test the rollback-prepared seems not to be executed. This is because
> serializations are finished while handling PREPARE message and the final
> state of transaction does not affect that, right? I think it may be helpful
> to add a one line comment.

Yes, but I am slightly unsure if it would be helpful to add this as we only 
test basic
cases(mainly for code coverage) for partial serialization.

> 
> 1. config.sgml
> 
> ```
> +the changes till logical_decoding_work_mem is reached. It can also
> be
> ```
> 
> I think it should be sandwiched by .

Added.

> 
> 2. config.sgml
> 
> ```
> +On the publisher side,
> logical_replication_mode allows
> +allows streaming or serializing changes immediately in logical
> decoding.
> ```
> 
> Typo "allows allows" -> "allows"

Fixed.

> 3. test general
> 
> You confirmed that the leader started to serialize changes, but did not ensure
> the endpoint.
> IIUC the parallel apply worker exits after applying serialized changes, and 
> it is
> not tested yet.
> Can we add polling the log somewhere?

I checked other tests and didn't find some examples where we test the exit of
apply worker or table sync worker. And if the parallel apply worker doesn't 
stop in
this case, we will fail anyway when reusing this worker to handle the next
transaction because the queue is broken. So, I prefer to keep the tests short.

> 4. 015_stream.pl
> 
> ```
> +is($result, qq(15000), 'all changes are replayed from file')
> ```
> 
> The statement may be unclear because changes can be also replicated when
> streaming = on.
> How about: "parallel apply worker replayed all changes from file"?

Changed.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-29 Thread houzj.f...@fujitsu.com
On Monday, January 30, 2023 12:13 PM Peter Smith  wrote:
> 
> Here are my review comments for v88-0002.

Thanks for your comments.

> 
> ==
> General
> 
> 1.
> The test cases are checking the log content but they are not checking for
> debug logs or untranslated elogs -- they are expecting a normal ereport LOG
> that might be translated. I’m not sure if that is OK, or if it is a potential 
> problem.

We have tests that check the ereport ERROR and ereport WARNING message(by
search for the ERROR or WARNING keyword for all the tap tests), so I think
checking the LOG should be fine.

> ==
> doc/src/sgml/config.sgml
> 
> 2.
> On the publisher side, logical_replication_mode allows allows streaming or
> serializing changes immediately in logical decoding. When set to immediate,
> stream each change if streaming option (see optional parameters set by
> CREATE SUBSCRIPTION) is enabled, otherwise, serialize each change. When set
> to buffered, the decoding will stream or serialize changes when
> logical_decoding_work_mem is reached.
> 
> 2a.
> typo "allows allows"  (Kuroda-san reported same)
> 
> 2b.
> "if streaming option" --> "if the streaming option"

Changed.

> ~~~
> 
> 3.
> On the subscriber side, if streaming option is set to parallel,
> logical_replication_mode also allows the leader apply worker to send changes
> to the shared memory queue or to serialize changes.
> 
> SUGGESTION
> On the subscriber side, if the streaming option is set to parallel,
> logical_replication_mode can be used to direct the leader apply worker to
> send changes to the shared memory queue or to serialize changes.

Changed.

> ==
> src/backend/utils/misc/guc_tables.c
> 
> 4.
>   {
>   {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
> - gettext_noop("Controls when to replicate each change."),
> - gettext_noop("On the publisher, it allows streaming or serializing each
> change in logical decoding."),
> + gettext_noop("Controls the internal behavior of logical replication
> publisher and subscriber"),
> + gettext_noop("On the publisher, it allows streaming or "
> + "serializing each change in logical decoding. On the "
> + "subscriber, in parallel streaming mode, it allows "
> + "the leader apply worker to serialize changes to "
> + "files and notifies the parallel apply workers to "
> + "read and apply them at the end of the transaction."),
>   GUC_NOT_IN_SAMPLE
>   },
> Suggest re-wording the long description (subscriber part) to be more like the
> documentation text.
> 
> BEFORE
> On the subscriber, in parallel streaming mode, it allows the leader apply 
> worker
> to serialize changes to files and notifies the parallel apply workers to read 
> and
> apply them at the end of the transaction.
> 
> SUGGESTION
> On the subscriber, if the streaming option is set to parallel, it directs the 
> leader
> apply worker to send changes to the shared memory queue or to serialize
> changes and apply them at the end of the transaction.
> 

Changed.

Attach the new version patch which addressed all comments so far (the v88-0001
has been committed, so we only have one remaining patch this time).

Best Regards,
Hou zj


v89-0001-Extend-the-logical_replication_mode-to-test-the-.patch
Description:  v89-0001-Extend-the-logical_replication_mode-to-test-the-.patch


RE: Deadlock between logrep apply worker and tablesync worker

2023-01-27 Thread houzj.f...@fujitsu.com
On Friday, January 27, 2023 8:16 PM Amit Kapila 
> 
> On Fri, Jan 27, 2023 at 3:45 PM vignesh C  wrote:
> >
> > On Mon, 23 Jan 2023 at 10:52, Amit Kapila 
> wrote:
> > >
> > > IIRC, this is done to prevent concurrent drops of origin drop say by
> > > exposed API pg_replication_origin_drop(). See the discussion in [1]
> > > related to it. If we want we can optimize it so that we can acquire
> > > the lock on the specific origin as mentioned in comments
> > > replorigin_drop_by_name() but it was not clear that this operation
> > > would be frequent enough.
> >
> > Here is an attached patch to lock the replication origin record using
> > LockSharedObject instead of locking pg_replication_origin relation in
> > ExclusiveLock mode. Now tablesync worker will wait only if the
> > tablesync worker is trying to drop the same replication origin which
> > has already been dropped by the apply worker, the other tablesync
> > workers will be able to successfully drop the replication origin
> > without any wait.
> >
> 
> There is a code in the function replorigin_drop_guts() that uses the
> functionality introduced by replorigin_exists(). Can we reuse this function 
> for
> the same?

Maybe we can use SearchSysCacheExists1 to check the existence instead of
adding a new function.

One comment about the patch.

@@ -430,23 +445,21 @@ replorigin_drop_by_name(const char *name, bool 
missing_ok, bool nowait)
...
+   /* Drop the replication origin if it has not been dropped already */
+   if (replorigin_exists(roident))
replorigin_drop_guts(rel, roident, nowait);

If developer pass missing_ok as false, should we report an ERROR here
instead of silently return ?

Best Regards,
Hou zj


RE: Logical replication timeout problem

2023-01-27 Thread houzj.f...@fujitsu.com
On Wednesday, January 25, 2023 7:26 PM Amit Kapila 
> 
> On Tue, Jan 24, 2023 at 8:15 AM wangw.f...@fujitsu.com
>  wrote:
> >
> > Attach the new patch.
> >
> 
> I think the patch missed to handle the case of non-transactional messages 
> which
> was previously getting handled. I have tried to address that in the attached. 
> Is
> there a reason that shouldn't be handled?

Thanks for updating the patch!

I thought about the non-transactional message. I think it seems fine if we
don’t handle it for timeout because such message is decoded via:

WalSndLoop
-XLogSendLogical
--LogicalDecodingProcessRecord
---logicalmsg_decode
ReorderBufferQueueMessage
-rb->message() -- //maybe send the message or do nothing here.

After invoking rb->message(), we will directly return to the main
loop(WalSndLoop) where we will get a chance to call
WalSndKeepaliveIfNecessary() to avoid the timeout.

This is a bit different from transactional changes, because for transactional 
changes, we
will buffer them and then send every buffered change one by one(via
ReorderBufferProcessTXN) without going back to the WalSndLoop, so we don't get
a chance to send keepalive message if necessary, which is more likely to cause 
the
timeout problem.

I will also test the non-transactional message for timeout in case I missed 
something.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-25 Thread houzj.f...@fujitsu.com
On Wednesday, January 25, 2023 7:30 AM Peter Smith  
wrote:
> 
> Here are my review comments for patch v87-0002.

Thanks for your comments.

> ==
> doc/src/sgml/config.sgml
> 
> 1.
> 
> -Allows streaming or serializing changes immediately in
> logical decoding.
>  The allowed values of
> logical_replication_mode are
> -buffered and immediate. When
> set
> -to immediate, stream each change if
> +buffered and immediate.
> The default
> +is buffered.
> +   
> 
> I didn't think it was necessary to say “of logical_replication_mode”.
> IMO that much is already obvious because this is the first sentence of the
> description for logical_replication_mode.
> 

Changed.

> ~~~
> 
> 2.
> +   
> +On the publisher side, it allows streaming or serializing changes
> +immediately in logical decoding.  When set to
> +immediate, stream each change if
>  streaming option (see optional parameters set by
>  CREATE
> SUBSCRIPTION)
>  is enabled, otherwise, serialize each change.  When set to
> -buffered, which is the default, decoding will 
> stream
> -or serialize changes when
> logical_decoding_work_mem
> -is reached.
> +buffered, decoding will stream or serialize 
> changes
> +when logical_decoding_work_mem is
> reached.
> 
> 
> 2a.
> "it allows" --> "logical_replication_mode allows"
> 
> 2b.
> "decoding" --> "the decoding"

Changed.

> ~~~
> 
> 3.
> +   
> +On the subscriber side, if streaming option is set
> +to parallel, this parameter also allows the leader
> +apply worker to send changes to the shared memory queue or to
> serialize
> +changes.  When set to buffered, the leader sends
> +changes to parallel apply workers via shared memory queue.  When
> set to
> +immediate, the leader serializes all changes to
> +files and notifies the parallel apply workers to read and apply them 
> at
> +the end of the transaction.
> +   
> 
> "this parameter also allows" --> "logical_replication_mode also allows"

Changed.

> ~~~
> 
> 4.
> 
>  This parameter is intended to be used to test logical decoding and
>  replication of large transactions for which otherwise we need to
>  generate the changes till
> logical_decoding_work_mem
> -is reached.
> +is reached. Moreover, this can also be used to test the transmission 
> of
> +changes between the leader and parallel apply workers.
> 
> 
> "Moreover, this can also" --> "It can also"
> 
> I am wondering would this sentence be better put at the top of the GUC
> description. So then the first paragraph becomes like this:
> 
> 
> SUGGESTION (I've also added another sentence "The effect of...")
> 
> The allowed values are buffered and immediate. The default is buffered. This
> parameter is intended to be used to test logical decoding and replication of 
> large
> transactions for which otherwise we need to generate the changes till
> logical_decoding_work_mem is reached. It can also be used to test the
> transmission of changes between the leader and parallel apply workers. The
> effect of logical_replication_mode is different for the publisher and
> subscriber:
> 
> On the publisher side...
> 
> On the subscriber side...

I think your suggestion makes sense, so changed as suggested.

> ==
> .../replication/logical/applyparallelworker.c
> 
> 5.
> + /*
> + * In immeidate mode, directly return false so that we can switch to
> + * PARTIAL_SERIALIZE mode and serialize remaining changes to files.
> + */
> + if (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE) return
> + false;
> 
> Typo "immediate"
> 
> Also, I felt "directly" is not needed. "return false" and "directly return 
> false" is the
> same.
> 
> SUGGESTION
> Using ‘immediate’ mode returns false to cause a switch to PARTIAL_SERIALIZE
> mode so that the remaining changes will be serialized.

Changed.

> ==
> src/backend/utils/misc/guc_tables.c
> 
> 6.
>   {
>   {"logical_replication_mode", PGC_USERSET, DEVELOPER_OPTIONS,
> - gettext_noop("Allows streaming or serializing each change in logical
> decoding."),
> - NULL,
> + gettext_noop("Controls the behavior of logical replication publisher
> and subscriber"),
> + gettext_noop("If set to immediate, on the publisher side, it "
> + "allows streaming or serializing each change in "
> + "logical decoding. On the subscriber side, in "
> + "parallel streaming mode, it allows the leader apply "
> + "worker to serialize changes to files and notifies "
> + "the parallel apply workers to read and apply them at "
> + "the end of the transaction."),
>   GUC_NOT_IN_SAMPLE
>   },
> 
> 6a. short description
> 
> User PoV behaviour should be the same. Instead, maybe say "controls the
> internal behavior" or something like that?

Changed to "internal behavior xxx"

> ~
> 
> 6b. long description
> 

RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-24 Thread houzj.f...@fujitsu.com
On Tuesday, January 24, 2023 8:47 PM Hou, Zhijie wrote:
> 
> On Tuesday, January 24, 2023 3:19 PM Peter Smith 
> wrote:
> >
> > Here are some review comments for v86-0002
> >

Sorry, the patch set was somehow attached twice. Here is the correct new version
patch set which addressed all comments so far.

Best Regards,
Hou zj


v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch
Description:  v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch


v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch
Description:  v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-24 Thread houzj.f...@fujitsu.com
On Monday, January 23, 2023 8:34 PM Kuroda, Hayato wrote:
> 
> Followings are my comments.

Thanks for your comments.

> 
> 1. guc_tables.c
> 
> ```
>  static const struct config_enum_entry logical_decoding_mode_options[] = {
> -   {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
> -   {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
> +   {"buffered", LOGICAL_REP_MODE_BUFFERED, false},
> +   {"immediate", LOGICAL_REP_MODE_IMMEDIATE, false},
> {NULL, 0, false}
>  };
> ```
> 
> This struct should be also modified.

Modified.

> 
> 2. guc_tables.c
> 
> 
> ```
> -   {"logical_decoding_mode", PGC_USERSET,
> DEVELOPER_OPTIONS,
> +   {"logical_replication_mode", PGC_USERSET,
> + DEVELOPER_OPTIONS,
> gettext_noop("Allows streaming or serializing each
> change in logical decoding."),
> NULL,
> ```
> 
> I felt the description seems not to be suitable for current behavior.
> A short description should be like "Sets a behavior of logical replication", 
> and
> further descriptions can be added in lond description.

I adjusted the description here.

> 3. config.sgml
> 
> ```
>
> This parameter is intended to be used to test logical decoding and
> replication of large transactions for which otherwise we need to
> generate the changes till
> logical_decoding_work_mem
> is reached.
>
> ```
> 
> I understood that this part described the usage of the parameter. How about
> adding a statement like:
> 
> " Moreover, this can be also used to test the message passing between the
> leader and parallel apply workers."

Added.

> 4. 015_stream.pl
> 
> ```
> +# Ensure that the messages are serialized.
> ```
> 
> In other parts "changes" are used instead of "messages". Can you change the
> word?

Changed.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-24 Thread houzj.f...@fujitsu.com
On Tuesday, January 24, 2023 11:43 AM Peter Smith  wrote:

> 
> Here are my review comments for patch v86-0001.

Thanks for your comments.

> 
> 
> ==
> Commit message
> 
> 2.
> Since we may extend the developer option logical_decoding_mode to to test the
> parallel apply of large transaction on subscriber, rename this option to
> logical_replication_mode to make it easier to understand.
> 
> ~
> 
> 2a
> typo "to to"
> 
> typo "large transaction on subscriber" --> "large transactions on the 
> subscriber"
> 
> ~
> 
> 2b.
> IMO better to rephrase the whole paragraph like shown below.
> 
> SUGGESTION
> 
> Rename the developer option 'logical_decoding_mode' to the more flexible
> name 'logical_replication_mode' because doing so will make it easier to extend
> this option in future to help test other areas of logical replication.

Changed.

> ==
> doc/src/sgml/config.sgml
> 
> 3.
> Allows streaming or serializing changes immediately in logical decoding. The
> allowed values of logical_replication_mode are buffered and immediate. When
> set to immediate, stream each change if streaming option (see optional
> parameters set by CREATE SUBSCRIPTION) is enabled, otherwise, serialize each
> change. When set to buffered, which is the default, decoding will stream or
> serialize changes when logical_decoding_work_mem is reached.
> 
> ~
> 
> IMO it's more clear to say the default when the options are first mentioned. 
> So I
> suggested removing the "which is the default" part, and instead saying:
> 
> BEFORE
> The allowed values of logical_replication_mode are buffered and immediate.
> 
> AFTER
> The allowed values of logical_replication_mode are buffered and immediate. The
> default is buffered.

I included this change in the 0002 patch.

> ==
> src/backend/utils/misc/guc_tables.c
> 
> 4.
> @@ -396,8 +396,8 @@ static const struct config_enum_entry
> ssl_protocol_versions_info[] = {  };
> 
>  static const struct config_enum_entry logical_decoding_mode_options[] = {
> - {"buffered", LOGICAL_DECODING_MODE_BUFFERED, false},
> - {"immediate", LOGICAL_DECODING_MODE_IMMEDIATE, false},
> + {"buffered", LOGICAL_REP_MODE_BUFFERED, false}, {"immediate",
> + LOGICAL_REP_MODE_IMMEDIATE, false},
>   {NULL, 0, false}
>  };
> 
> I noticed this array is still called "logical_decoding_mode_options".
> Was that deliberate?

No, I didn't notice this one. Changed.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-24 Thread houzj.f...@fujitsu.com
On Tuesday, January 24, 2023 3:19 PM Peter Smith  wrote:
> 
> Here are some review comments for v86-0002
> 
> ==
> Commit message
> 
> 1.
> Use the use the existing developer option logical_replication_mode to test the
> parallel apply of large transaction on subscriber.
> 
> ~
> 
> Typo “Use the use the”
> 
> SUGGESTION (rewritten)
> Give additional functionality to the existing developer option
> 'logical_replication_mode' to help test parallel apply of large transactions 
> on the
> subscriber.

Changed.

> ~~~
> 
> 2.
> Maybe that commit message should also say extra TAP tests that have been
> added to exercise the serialization part of the parallel apply?

Added.

> BTW – I can see the TAP tests are testing full serialization (when the GUC is
> 'immediate') but I not sure how is "partial" serialization (when it has to 
> switch
> halfway from shmem to files) being tested.

The new tests are intended to test most of new code patch for partial
serialization by doing it from the beginning. Later, if required, we can add
different tests for it.

> 
> ==
> doc/src/sgml/config.sgml
> 
> 3.
> Allows streaming or serializing changes immediately in logical decoding. The
> allowed values of logical_replication_mode are buffered and immediate. When
> set to immediate, stream each change if streaming option (see optional
> parameters set by CREATE SUBSCRIPTION) is enabled, otherwise, serialize each
> change. When set to buffered, which is the default, decoding will stream or
> serialize changes when logical_decoding_work_mem is reached.
> On the subscriber side, if streaming option is set to parallel, this 
> parameter also
> allows the leader apply worker to send changes to the shared memory queue or
> to serialize changes. When set to buffered, the leader sends changes to 
> parallel
> apply workers via shared memory queue. When set to immediate, the leader
> serializes all changes to files and notifies the parallel apply workers to 
> read and
> apply them at the end of the transaction.
> 
> ~
> 
> Because now this same developer GUC affects both the publisher side and the
> subscriber side differently IMO this whole description should be re-structured
> accordingly.
> 
> SUGGESTION (something like)
> 
> The allowed values of logical_replication_mode are buffered and immediate. The
> default is buffered.
> 
> On the publisher side, ...
> 
> On the subscriber side, ...

Changed.

> 
> ~~~
> 
> 4.
> This parameter is intended to be used to test logical decoding and 
> replication of
> large transactions for which otherwise we need to generate the changes till
> logical_decoding_work_mem is reached.
> 
> ~
> 
> Maybe this paragraph needs rewording or moving. e.g. Isn't that misleading
> now? Although this might be an explanation for the publisher side, it does not
> seem relevant to the subscriber side's behaviour.

Adjusted the description here.

> 
> ==
> .../replication/logical/applyparallelworker.c
> 
> 5.
> @ -1149,6 +1149,9 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size
> nbytes, const void *data)
>   Assert(!IsTransactionState());
>   Assert(!winfo->serialize_changes);
> 
> + if (logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE) return
> + false;
> +
> 
> I felt that code should have some comment, even if it is just something quite
> basic like "/* For developer testing */"

Added.

> 
> ==
> .../t/018_stream_subxact_abort.pl
> 
> 6.
> +# Clean up test data from the environment.
> +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
> +$node_publisher->wait_for_catchup($appname);
> 
> Is it necessary to TRUNCATE the table here? If everything is working shouldn't
> the data be rolled back anyway?

I think it's unnecessary, so removed.

> 
> ~~~
> 
> 7.
> +$node_publisher->safe_psql(
> + 'postgres', q{
> + BEGIN;
> + INSERT INTO test_tab_2 values(1);
> + SAVEPOINT sp;
> + INSERT INTO test_tab_2 values(1);
> + ROLLBACK TO sp;
> + COMMIT;
> + });
> 
> Perhaps this should insert 2 different values so then the verification code 
> can
> check the correct value remains instead of just checking COUNT(*)?

I think testing the count should be ok as the nearby testcases are
also checking the count.

Best regards,
Hou zj



v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch
Description:  v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch


v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch
Description:  v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch


v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch
Description:  v87-0002-Extend-the-logical_replication_mode-to-test-the-.patch


v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch
Description:  v87-0001-Rename-logical_decoding_mode-to-logical_replicat.patch


RE: wake up logical workers after ALTER SUBSCRIPTION

2023-01-23 Thread houzj.f...@fujitsu.com
On Monday, January 23, 2023 3:13 AM Tom Lane  wrote:

Hi,

> 
> Nathan Bossart  writes:
> > On Tue, Jan 10, 2023 at 10:59:14AM +0530, Amit Kapila wrote:
> >> I haven't looked in detail but isn't it better to explain somewhere
> >> in the comments that it achieves to rate limit the restart of workers
> >> in case of error and allows them to restart immediately in case of
> >> subscription parameter change?
> 
> > I expanded one of the existing comments to make this clear.
> 
> I pushed v17 with some mostly-cosmetic changes, including more comments.

I noticed one minor thing in this commit. 

-
LogicalRepCtx->last_start_dsh = DSM_HANDLE_INVALID;
-

The code takes the last_start_dsh as dsm_handle, but it seems it is a 
dsa_pointer.
" typedef dsa_pointer dshash_table_handle;" This won’t cause any problem, but I 
feel
It would be easier to understand if we take it as dsa_pointer and use 
InvalidDsaPointer here,
like what he attached patch does. What do you think ?

Best regards,
Hou zj





0001-Take-last_start_dsh-as-dsa_pointer.patch
Description: 0001-Take-last_start_dsh-as-dsa_pointer.patch


RE: Logical replication timeout problem

2023-01-23 Thread houzj.f...@fujitsu.com
On Monday, January 23, 2023 8:51 AM Peter Smith  wrote:
> 
> Here are my review comments for patch v4-0001
> ==
> Commit message
> 
> 2.
> 
> The problem is when there is a DDL in a transaction that generates lots of
> temporary data due to rewrite rules, these temporary data will not be 
> processed
> by the pgoutput plugin. The previous commit (f95d53e) only fixed timeouts
> caused by filtering out changes in pgoutput. Therefore, the previous fix for 
> DML
> had no impact on this case.
> 
> ~
> 
> IMO this still some rewording to say up-front what the the actual problem -- 
> i.e.
> an avoidable timeout occuring.
> 
> SUGGESTION (or something like this...)
> 
> When there is a DDL in a transaction that generates lots of temporary data due
> to rewrite rules, this temporary data will not be processed by the pgoutput
> plugin. This means it is possible for a timeout to occur if a sufficiently 
> long time
> elapses since the last pgoutput message. A previous commit (f95d53e) fixed a
> similar scenario in this area, but that only fixed timeouts for DML going 
> through
> pgoutput, so it did not address this DDL timeout case.

Thanks, I changed the commit message as suggested.

> ==
> src/backend/replication/logical/logical.c
> 
> 3. update_progress_txn_cb_wrapper
> 
> +/*
> + * Update progress callback while processing a transaction.
> + *
> + * Try to update progress and send a keepalive message during sending
> +data of a
> + * transaction (and its subtransactions) to the output plugin.
> + *
> + * For a large transaction, if we don't send any change to the
> +downstream for a
> + * long time (exceeds the wal_receiver_timeout of standby) then it can 
> timeout.
> + * This can happen when all or most of the changes are either not
> +published or
> + * got filtered out.
> + */
> +static void
> +update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN
> *txn,
> +ReorderBufferChange *change)
> 
> Simplify the "Try to..." paragraph. And other part should also mention about 
> DDL.
> 
> SUGGESTION
> 
> Try send a keepalive message during transaction processing.
> 
> This is done because if we don't send any change to the downstream for a long
> time (exceeds the wal_receiver_timeout of standby), then it can timeout. This 
> can
> happen for large DDL, or for large transactions when all or most of the 
> changes
> are either not published or got filtered out.

Changed.

> ==
> .../replication/logical/reorderbuffer.c
> 
> 4. ReorderBufferProcessTXN
> 
> @@ -2105,6 +2105,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> 
>   PG_TRY();
>   {
> + /*
> + * Static variable used to accumulate the number of changes while
> + * processing txn.
> + */
> + static int changes_count = 0;
> +
> + /*
> + * Sending keepalive messages after every change has some overhead, but
> + * testing showed there is no noticeable overhead if keepalive is only
> + * sent after every ~100 changes.
> + */
> +#define CHANGES_THRESHOLD 100
> +
> 
> IMO these can be relocated to be declared/defined inside the "while"
> loop -- i.e. closer to where they are being used.

Moved into the while loop.

Attach the new version patch which addressed above comments.
Also attach a simple script which use "refresh matview" to reproduce
this timeout problem just in case some one want to try to reproduce this.

Best regards,
Hou zj


test.sh
Description: test.sh


v5-0001-Fix-the-logical-replication-timeout-during-proces.patch
Description:  v5-0001-Fix-the-logical-replication-timeout-during-proces.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-23 Thread houzj.f...@fujitsu.com
On Monday, January 23, 2023 11:17 AM Amit Kapila  
wrote:
> 
> On Fri, Jan 20, 2023 at 11:48 AM Masahiko Sawada 
> wrote:
> >
> > >
> > > Yet another way is to use the existing parameter logical_decode_mode
> > > [1]. If the value of logical_decoding_mode is 'immediate', then we
> > > can immediately switch to partial serialize mode. This will
> > > eliminate the dependency on timing. The one argument against using
> > > this is that it won't be as clear as a separate parameter like
> > > 'stream_serialize_threshold' proposed by the patch but OTOH we
> > > already have a few parameters that serve a different purpose when
> > > used on the subscriber. For example, 'max_replication_slots' is used
> > > to define the maximum number of replication slots on the publisher
> > > and the maximum number of origins on subscribers. Similarly,
> > > wal_retrieve_retry_interval' is used for different purposes on
> > > subscriber and standby nodes.
> >
> > Using the existing parameter makes sense to me. But if we use
> > logical_decoding_mode also on the subscriber, as Shveta Malik also
> > suggested, probably it's better to rename it so as not to confuse. For
> > example, logical_replication_mode or something.
> >
> 
> +1. Among the options discussed, this sounds better.

OK, here is patch set which does the same.
The first patch set only renames the GUC name, and the second patch uses
the GUC to test the partial serialization.

Best Regards,
Hou zj



v86-0002-Extend-the-logical_replication_mode-to-test-the-stre.patch
Description:  v86-0002-Extend-the-logical_replication_mode-to-test-the-stre.patch


v86-0001-Rename-logical_decoding_mode-to-logical_replication_.patch
Description:  v86-0001-Rename-logical_decoding_mode-to-logical_replication_.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-17 Thread houzj.f...@fujitsu.com
On Tuesday, January 17, 2023 12:34 PM shveta malik  
wrote:
> 
> On Tue, Jan 17, 2023 at 9:07 AM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Tuesday, January 17, 2023 11:32 AM Peter Smith
>  wrote:
> > > OK. I didn't know there was another header convention that you were
> > > following.
> > > In that case, it is fine to leave the name as-is.
> >
> > Thanks for confirming!
> >
> > Attach the new version 0001 patch which addressed all other comments.
> >
> > Best regards,
> > Hou zj
> 
> Hello Hou-san,
> 
> 1. Do we need to extend test-cases to review the leader_pid column in pg_stats
> tables?

Thanks for the comments.

We currently don't have any tests for the view, so I feel we can extend
them later as a separate patch.

> 2. Do we need to follow the naming convention for
> 'GetLeaderApplyWorkerPid' like other functions in the same file which starts
> with 'logicalrep_'

We have agreed [1] to follow the naming convention for functions in 
logicallauncher.h
which are mainly used for other modules.

[1] 
https://www.postgresql.org/message-id/CAHut%2BPtgj%3DDY8F1cMBRUxsZtq2-faW%3D%3D5-dSuHSPJGx1a_vBFQ%40mail.gmail.com

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-17 Thread houzj.f...@fujitsu.com
On Tuesday, January 17, 2023 2:46 PM Masahiko Sawada  
wrote:
> 
> On Tue, Jan 17, 2023 at 12:37 PM houzj.f...@fujitsu.com
>  wrote:
> > Attach the new version 0001 patch which addressed all other comments.
> >
> 
> Thank you for updating the patch. Here is one comment:
> 
> @@ -426,14 +427,24 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
> 
> /*
>  * Show the leader only for active parallel
> workers.  This
> -* leaves the field as NULL for the
> leader of a parallel
> -* group.
> +* leaves the field as NULL for the
> leader of a parallel group
> +* or the leader of parallel apply workers.
>  */
> if (leader && leader->pid !=
> beentry->st_procpid)
> {
> values[28] =
> Int32GetDatum(leader->pid);
> nulls[28] = false;
> }
> +   else
> +   {
> +   int
> leader_pid = GetLeaderApplyWorkerPid(beentry->st_procpid);
> +
> +   if (leader_pid != InvalidPid)
> +   {
> +   values[28] =
> Int32GetDatum(leader_pid);
> +   nulls[28] = false;
> +   }
> +   }
> }
> 
> I'm slightly concerned that there could be overhead of executing
> GetLeaderApplyWorkerPid () for every backend process except for parallel
> query workers. The number of such backends could be large and
> GetLeaderApplyWorkerPid() acquires the lwlock. For example, does it make
> sense to check (st_backendType == B_BG_WORKER) before calling
> GetLeaderApplyWorkerPid()? Or it might not be a problem since it's
> LogicalRepWorkerLock which is not likely to be contended.

Thanks for the comment and I think your suggestion makes sense.
I have added the check before getting the leader pid. Here is the new version 
patch.

Best regards,
Hou zj


v83-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch
Description:  v83-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-16 Thread houzj.f...@fujitsu.com
On Tuesday, January 17, 2023 12:55 PM Amit Kapila  
wrote:
> 
> On Tue, Jan 17, 2023 at 8:59 AM Amit Kapila  wrote:
> >
> > On Tue, Jan 17, 2023 at 8:35 AM Masahiko Sawada
>  wrote:
> > >
> > > On Mon, Jan 16, 2023 at 3:19 PM Amit Kapila 
> wrote:
> > > >
> > > > Okay, I have added the comments in get_transaction_apply_action()
> > > > and updated the comments to refer to the enum TransApplyAction
> > > > where all the actions are explained.
> > >
> > > Thank you for the patch.
> > >
> > > @@ -1710,6 +1712,7 @@ apply_handle_stream_stop(StringInfo s)
> > > }
> > >
> > > in_streamed_transaction = false;
> > > +   stream_xid = InvalidTransactionId;
> > >
> > > We reset stream_xid also in stream_close_file() but probably it's no
> > > longer necessary?
> > >
> >
> > I think so.
> >
> > > How about adding an assertion in apply_handle_stream_start() to make
> > > sure the stream_xid is invalid?
> > >
> >
> > I think it would be better to add such an assert in
> > apply_handle_begin/apply_handle_begin_prepare because there won't be a
> > problem if we start_stream message even when stream_xid is valid.
> > However, maybe it is better to add in all three functions
> >
> (apply_handle_begin/apply_handle_begin_prepare/apply_handle_stream_star
> t).
> > What do you think?
> >
> > > ---
> > > It's not related to this issue but I realized that if the action
> > > returned by get_transaction_apply_action() is not handled in the
> > > switch statement, we do only Assert(false). Is it better to raise an
> > > error like "unexpected apply action %d" just in case in order to
> > > detect failure cases also in the production environment?
> > >
> >
> > Yeah, that may be better. Shall we do that as part of this patch only
> > or as a separate patch?
> >
> 
> Please find attached the updated patches to address the above comments. I
> think we can combine and commit them as one patch as both are related.

Thanks for fixing these.
I have confirmed that all regression tests passed after applying the patches.
And the patches look good to me.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-16 Thread houzj.f...@fujitsu.com
On Tuesday, January 17, 2023 11:32 AM Peter Smith  wrote:
> 
> On Tue, Jan 17, 2023 at 1:21 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Tuesday, January 17, 2023 5:43 AM Peter Smith
>  wrote:
> > >
> > > On Mon, Jan 16, 2023 at 5:41 PM Amit Kapila
> > > 
> > > wrote:
> > > >
> > > > On Mon, Jan 16, 2023 at 10:24 AM Peter Smith
> > > > 
> > > wrote:
> > > > >
> > > > > 2.
> > > > >
> > > > >  /*
> > > > > + * Return the pid of the leader apply worker if the given pid
> > > > > +is the pid of a
> > > > > + * parallel apply worker, otherwise return InvalidPid.
> > > > > + */
> > > > > +pid_t
> > > > > +GetLeaderApplyWorkerPid(pid_t pid) {  int leader_pid =
> > > > > +InvalidPid;  int i;
> > > > > +
> > > > > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> > > > > +
> > > > > + for (i = 0; i < max_logical_replication_workers; i++) {
> > > > > + LogicalRepWorker *w = >workers[i];
> > > > > +
> > > > > + if (isParallelApplyWorker(w) && w->proc && pid ==
> > > > > + w->proc->pid) { leader_pid = w->leader_pid; break; } }
> > > > > +
> > > > > + LWLockRelease(LogicalRepWorkerLock);
> > > > > +
> > > > > + return leader_pid;
> > > > > +}
> > > > >
> > > > > 2a.
> > > > > IIUC the IsParallelApplyWorker macro does nothing except check
> > > > > that the leader_pid is not InvalidPid anyway, so AFAIK this
> > > > > algorithm does not benefit from using this macro because we will
> > > > > want to return InvalidPid anyway if the given pid matches.
> > > > >
> > > > > So the inner condition can just say:
> > > > >
> > > > > if (w->proc && w->proc->pid == pid) { leader_pid =
> > > > > w->leader_pid; break; }
> > > > >
> > > >
> > > > Yeah, this should also work but I feel the current one is explicit
> > > > and more clear.
> > >
> > > OK.
> > >
> > > But, I have one last comment about this function -- I saw there are
> > > already other functions that iterate max_logical_replication_workers
> > > like this looking for things:
> > > - logicalrep_worker_find
> > > - logicalrep_workers_find
> > > - logicalrep_worker_launch
> > > - logicalrep_sync_worker_count
> > >
> > > So I felt this new function (currently called
> > > GetLeaderApplyWorkerPid) ought to be named similarly to those ones.
> > > e.g. call it something like "logicalrep_worker_find_pa_leader_pid".
> > >
> >
> > I am not sure we can use the name, because currently all the API name
> > in launcher that used by other module(not related to subscription) are
> > like AxxBxx style(see the functions in logicallauncher.h).
> > logicalrep_worker_xxx style functions are currently only declared in
> > worker_internal.h.
> >
> 
> OK. I didn't know there was another header convention that you were following.
> In that case, it is fine to leave the name as-is.

Thanks for confirming!

Attach the new version 0001 patch which addressed all other comments.

Best regards,
Hou zj


v82-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch
Description:  v82-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-16 Thread houzj.f...@fujitsu.com
On Tuesday, January 17, 2023 5:43 AM Peter Smith  wrote:
> 
> On Mon, Jan 16, 2023 at 5:41 PM Amit Kapila 
> wrote:
> >
> > On Mon, Jan 16, 2023 at 10:24 AM Peter Smith 
> wrote:
> > >
> > > 2.
> > >
> > >  /*
> > > + * Return the pid of the leader apply worker if the given pid is
> > > +the pid of a
> > > + * parallel apply worker, otherwise return InvalidPid.
> > > + */
> > > +pid_t
> > > +GetLeaderApplyWorkerPid(pid_t pid)
> > > +{
> > > + int leader_pid = InvalidPid;
> > > + int i;
> > > +
> > > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> > > +
> > > + for (i = 0; i < max_logical_replication_workers; i++) {
> > > + LogicalRepWorker *w = >workers[i];
> > > +
> > > + if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) {
> > > + leader_pid = w->leader_pid; break; } }
> > > +
> > > + LWLockRelease(LogicalRepWorkerLock);
> > > +
> > > + return leader_pid;
> > > +}
> > >
> > > 2a.
> > > IIUC the IsParallelApplyWorker macro does nothing except check that
> > > the leader_pid is not InvalidPid anyway, so AFAIK this algorithm
> > > does not benefit from using this macro because we will want to
> > > return InvalidPid anyway if the given pid matches.
> > >
> > > So the inner condition can just say:
> > >
> > > if (w->proc && w->proc->pid == pid)
> > > {
> > > leader_pid = w->leader_pid;
> > > break;
> > > }
> > >
> >
> > Yeah, this should also work but I feel the current one is explicit and
> > more clear.
> 
> OK.
> 
> But, I have one last comment about this function -- I saw there are already
> other functions that iterate max_logical_replication_workers like this looking
> for things:
> - logicalrep_worker_find
> - logicalrep_workers_find
> - logicalrep_worker_launch
> - logicalrep_sync_worker_count
> 
> So I felt this new function (currently called GetLeaderApplyWorkerPid) ought
> to be named similarly to those ones. e.g. call it something like
> "logicalrep_worker_find_pa_leader_pid".
> 

I am not sure we can use the name, because currently all the API name in 
launcher that
used by other module(not related to subscription) are like
AxxBxx style(see the functions in logicallauncher.h).
logicalrep_worker_xxx style functions are currently only declared in
worker_internal.h.

Best regards,
Hou zj



RE: logrep stuck with 'ERROR: int2vector has too many elements'

2023-01-15 Thread houzj.f...@fujitsu.com
On Sunday, January 15, 2023 5:35 PM Erik Rijkers  wrote:
> 
> I can't find the exact circumstances that cause it but it has something to do 
> with
> many columns (or adding many columns) in combination with perhaps
> generated columns.
> 
> This replication test, in a slightly different form, used to work. This is 
> also
> suggested by the fact that the attached runs without errors in REL_15_STABLE 
> but
> gets stuck in HEAD.
> 
> What it does: it initdbs and runs two instances, primary and replica. In the
> primary 'pgbench -is1' done, and many columns, including 1 generated column,
> are added to all 4 pgbench tables. This is then pg_dump/pg_restored to the
> replica, and a short pgbench is run. The result tables on primary and replica 
> are
> compared for the final result.
> (To run it will need some tweaks to directory and connection parms)
> 
> I ran it on both v15 and v16 for 25 runs: with the parameters as given
> 15 has no problem while 16 always got stuck with the int2vector error.
> (15 can actually be pushed up to the max of 1600 columns per table without
> errors)
> 
> Both REL_15_STABLE and 16devel built from recent master on Debian 10, gcc
> 12.2.0.
> 
> I hope someone understands what's going wrong.

Thanks for reporting.

I think the basic problem is that we try to fetch the column list as a 
int2vector
when doing table sync, and then if the number of columns is larger than 100, we
will get an ERROR like the $subject.

We can also hit this ERROR by manually specifying a long(>100) column
list in the publication Like:

create publication pub for table test(a1,a2,a3... a200);
create subscription xxx.

The script didn't reproduce this in PG15, because we didn't filter out
generated column when fetching the column list, so it assumes all columns are
replicated and will return NULL for the column list(int2vector) value. But in
PG16 (b7ae039), we started to filter out generated column(because generated 
columns are
not replicated in logical replication), so we get a valid int2vector and get
the ERROR. 
I will think and work on a fix for this.

Best regards,
Hou zj


BF animal malleefowl reported an failure in 001_password.pl

2023-01-13 Thread houzj.f...@fujitsu.com
Hi,

I noticed one BF failure[1] when monitoring the BF for some other commit.

#   Failed test 'authentication success for method password, connstr 
user=scram_role: log matches'
#   at t/001_password.pl line 120.
#   '2023-01-13 07:33:46.741 EST [243628:5] LOG:  received 
SIGHUP, reloading configuration files
# 2023-01-13 07:33:46.742 EST [243662:1] [unknown] LOG:  connection received: 
host=[local]
# 2023-01-13 07:33:46.744 EST [243662:2] [unknown] LOG:  connection authorized: 
user=scram_role database=postgres application_name=001_password.pl
# 2023-01-13 07:33:46.748 EST [243662:3] 001_password.pl LOG:  statement: 
SELECT $$connected with user=scram_role$$
# '
# doesn't match '(?^:connection authenticated: identity="scram_role" 
method=password)'
# Looks like you failed 1 test of 79.
[08:33:47] t/001_password.pl  

After checking the test and log, I can see the test failed at the following 
code:

# For plain "password" method, all users should also be able to connect.
reset_pg_hba($node, 'all', 'all', 'password');
test_conn($node, 'user=scram_role', 'password', 0,
log_like =>
  [qr/connection authenticated: identity="scram_role" 
method=password/]);


>From the log, the expected "xxx method=password " log was not output, a simple
"connection authorized: user=scram_role database=postgres " was output Instead.
So it seems the connection happens before pg_ident.conf is actually reloaded ?
Not sure if we need to do something make sure the reload happen, because it's
looks like very rare failure which hasn't happen in last 90 days.

[1] 
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=malleefowl=2023-01-13%2009%3A54%3A51

Best regards,
Hou zhijie





RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-13 Thread houzj.f...@fujitsu.com
On Friday, January 13, 2023 1:43 PM Masahiko Sawada  
wrote:
> On Thu, Jan 12, 2023 at 9:34 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Thursday, January 12, 2023 7:08 PM Amit Kapila
>  wrote:
> > >
> > > On Thu, Jan 12, 2023 at 4:21 PM shveta malik 
> wrote:
> > > >
> > > > On Thu, Jan 12, 2023 at 10:34 AM Amit Kapila
> > > > 
> > > wrote:
> > > > >
> > > > > On Thu, Jan 12, 2023 at 9:54 AM Peter Smith
> > > > > 
> > > wrote:
> > > > > >
> > > > > >
> > > > > > doc/src/sgml/monitoring.sgml
> > > > > >
> > > > > > 5. pg_stat_subscription
> > > > > >
> > > > > > @@ -3198,11 +3198,22 @@ SELECT pid, wait_event_type,
> > > > > > wait_event FROM pg_stat_activity WHERE wait_event i
> > > > > >
> > > > > >   
> > > > > > > > > > > role="column_definition">
> > > > > > +   apply_leader_pid
> > > integer
> > > > > > +  
> > > > > > +  
> > > > > > +   Process ID of the leader apply worker, if this process is a
> apply
> > > > > > +   parallel worker. NULL if this process is a leader apply 
> > > > > > worker
> or a
> > > > > > +   synchronization worker.
> > > > > > +  
> > > > > > + 
> > > > > > +
> > > > > > + 
> > > > > > +   > > > > > + role="column_definition">
> > > > > > relid oid
> > > > > >
> > > > > >
> > > > > > OID of the relation that the worker is synchronizing; null 
> > > > > > for
> the
> > > > > > -   main apply worker
> > > > > > +   main apply worker and the parallel apply worker
> > > > > >
> > > > > >   
> > > > > >
> > > > > > 5a.
> > > > > >
> > > > > > (Same as general comment #1 about terminology)
> > > > > >
> > > > > > "apply_leader_pid" --> "leader_apply_pid"
> > > > > >
> > > > >
> > > > > How about naming this as just leader_pid? I think it could be
> > > > > helpful in the future if we decide to parallelize initial sync
> > > > > (aka parallel
> > > > > copy) because then we could use this for the leader PID of
> > > > > parallel sync workers as well.
> > > > >
> > > > > --
> > > >
> > > > I still prefer leader_apply_pid.
> > > > leader_pid does not tell which 'operation' it belongs to. 'apply'
> > > > gives the clarity that it is apply related process.
> > > >
> > >
> > > But then do you suggest that tomorrow if we allow parallel sync
> > > workers then we have a separate column leader_sync_pid? I think that
> > > doesn't sound like a good idea and moreover one can refer to docs for
> clarification.
> >
> > I agree that leader_pid would be better not only for future parallel
> > copy sync feature, but also it's more consistent with the leader_pid column 
> > in
> pg_stat_activity.
> >
> > And here is the version patch which addressed Peter's comments and
> > renamed all the related stuff to leader_pid.
> 
> Here are two comments on v79-0003 patch.

Thanks for the comments.

> 
> +/* Force to serialize messages if stream_serialize_threshold
> is reached. */
> +if (stream_serialize_threshold != -1 &&
> +(stream_serialize_threshold == 0 ||
> + stream_serialize_threshold < parallel_stream_nchunks))
> +{
> +parallel_stream_nchunks = 0;
> +return false;
> +}
> 
> I think it would be better if we show the log message ""logical replication 
> apply
> worker will serialize the remaining changes of remote transaction %u to a 
> file"
> even in stream_serialize_threshold case.

Agreed and changed.

> 
> IIUC parallel_stream_nchunks won't be reset if pa_send_data() failed due to 
> the
> timeout.

Changed.

Best Regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-13 Thread houzj.f...@fujitsu.com
On Friday, January 13, 2023 1:02 PM Masahiko Sawada  
wrote:
> 
> On Fri, Jan 13, 2023 at 1:28 PM Amit Kapila  wrote:
> >
> > On Fri, Jan 13, 2023 at 9:06 AM Amit Kapila 
> wrote:
> > >
> > > On Fri, Jan 13, 2023 at 7:56 AM Peter Smith 
> wrote:
> > > >
> > >
> > > >
> > > > 3.
> > > >
> > > > > > > role="column_definition">
> > > > +   leader_pid integer
> > > > +  
> > > > +  
> > > > +   Process ID of the leader apply worker if this process is a 
> > > > parallel
> > > > +   apply worker; NULL if this process is a leader apply worker or
> does not
> > > > +   participate in parallel apply, or a synchronization worker
> > > > +  
> > > >
> > > > I felt this change is giving too many details and ended up just
> > > > muddying the water.
> > > >
> > >
> > > I see that we give a similar description for other parameters as well.
> > > For example leader_pid in pg_stat_activity,
> > >
> >
> > BTW, shouldn't we update leader_pid column in pg_stat_activity as well
> > to display apply leader PID for parallel apply workers? It will
> > currently display for other parallel operations like a parallel
> > vacuum, so I don't see a reason to not do the same for parallel apply
> > workers.
> 
> +1
> 
> The parallel apply workers have different properties than the parallel query
> workers since they execute different transactions and don't use group locking
> but it would be a good hint for users to show the leader and parallel apply
> worker processes are related. If users want to check only parallel query 
> workers
> they can use the backend_type column.

Agreed, and changed as suggested.

Attach the new version patch set which address the comments so far.

Best Regards,
Hou zj


v80-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
Description:  v80-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch


v80-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch
Description:  v80-0001-Display-the-leader-apply-worker-s-PID-for-parall.patch


v80-0002-Stop-extra-worker-if-GUC-was-changed.patch
Description: v80-0002-Stop-extra-worker-if-GUC-was-changed.patch


v80-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch
Description:  v80-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-13 Thread houzj.f...@fujitsu.com
On Friday, January 13, 2023 2:20 PM Peter Smith  wrote:
> 
> Here are some review comments for patch v79-0002.

Thanks for your comments.

> ==
> 
> General
> 
> 1.
> 
> I saw that earlier in this thread Hou-san [1] and Amit [2] also seemed to say
> there is not much point for this patch.
> 
> So I wanted to +1 that same opinion.
> 
> I feel this patch just adds more complexity for almost no gain:
> - reducing the 'max_apply_workers_per_suibscription' seems not very
> common in the first place.
> - even when the GUC is reduced, at that point in time all the workers might 
> be in
> use so there may be nothing that can be immediately done.
> - IIUC the excess workers (for a reduced GUC) are going to get freed naturally
> anyway over time as more transactions are completed so the pool size will
> reduce accordingly.

I need to think over it, and we can have detailed discussion after committing
the first patch. So I didn't address the comments for 0002 for now.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-12 Thread houzj.f...@fujitsu.com
On Thursday, January 12, 2023 12:24 PM Peter Smith  
wrote:
> 
> Hi, here are some review comments for patch v78-0001.

Thanks for your comments.

> ==
> 
> General
> 
> 1. (terminology)
> 
> AFAIK everywhere until now we’ve been referring everywhere
> (docs/comments/code) to the parent apply worker as the "leader apply
> worker". Not the "main apply worker". Not the "apply leader worker".
> Not any other variations...
> 
> From this POV I think the worker member "apply_leader_pid" would be better
> named "leader_apply_pid",  but I see that this was already committed to
> HEAD differently.
> 
> Maybe it is not possible (or you don't want) to change that internal member
> name but IMO at least all the new code and docs should try to be using
> consistent terminology (e.g. leader_apply_XXX) where possible.
> 
> ==
> 
> Commit message
> 
> 2.
> 
> main_worker_pid is Process ID of the leader apply worker, if this process is a
> apply parallel worker. NULL if this process is a leader apply worker or a
> synchronization worker.
> 
> IIUC, this text is just cut/paste from the monitoring.sgml. In a review 
> comment
> below I suggest some changes to that text, so then this commit message
> should also change to be the same.

Changed.

> ~~
> 
> 3.
> 
> The new column can make it easier to distinguish leader apply worker and
> apply parallel worker which is also similar to the 'leader_pid' column in
> pg_stat_activity.
> 
> SUGGESTION
> The new column makes it easier to distinguish parallel apply workers from
> other kinds of workers. It is implemented this way to be similar to the
> 'leader_pid' column in pg_stat_activity.

Changed.

> ==
> 
> doc/src/sgml/logical-replication.sgml
> 
> 4.
> 
> +   being synchronized. Moreover, if the streaming transaction is applied in
> +   parallel, there will be additional workers.
> 
> SUGGESTION
> there will be additional workers -> there may be additional parallel apply
> workers

Changed.

> ==
> 
> doc/src/sgml/monitoring.sgml
> 
> 5. pg_stat_subscription
> 
> @@ -3198,11 +3198,22 @@ SELECT pid, wait_event_type, wait_event FROM
> pg_stat_activity WHERE wait_event i
> 
>   
>
> +   apply_leader_pid integer
> +  
> +  
> +   Process ID of the leader apply worker, if this process is a apply
> +   parallel worker. NULL if this process is a leader apply worker or a
> +   synchronization worker.
> +  
> + 
> +
> + 
> +  
> relid oid
>
>
> OID of the relation that the worker is synchronizing; null for the
> -   main apply worker
> +   main apply worker and the parallel apply worker
>
>   
> 
> 5a.
> 
> (Same as general comment #1 about terminology)
> 
> "apply_leader_pid" --> "leader_apply_pid"

I changed this and all related stuff to "leader_pid" as I agree with Amit that
this might be useful for future features and is more consistent with the
leader_pid in pg_stat_activity.

> 
> ~~
> 
> 5b.
> 
> The current text feels awkward. I see it was copied from the similar text of
> 'pg_stat_activity' but perhaps it can be simplified a bit.
> 
> SUGGESTION
> Process ID of the leader apply worker if this process is a parallel apply 
> worker;
> otherwise NULL.

I slightly adjusted this according Amit's suggestion which I think would provide
more information.

"Process ID of the leader apply worker, if this process is a parallel apply 
worker.
NULL if this process is a leader apply worker or does not participate in 
parallel apply, or a synchronization worker."
"

> ~~
> 
> 5c.
> BEFORE
> null for the main apply worker and the parallel apply worker
> 
> AFTER
> null for the leader apply worker and parallel apply workers

Changed.

> ~~
> 
> 5c.
> 
> relid oid
>
>
> OID of the relation that the worker is synchronizing; null for the
> -   main apply worker
> +   main apply worker and the parallel apply worker
>
> 
> 
> main apply worker -> leader apply worker
> 

Changed.

> ~~~
> 
> 6.
> 
> @@ -3212,7 +3223,7 @@ SELECT pid, wait_event_type, wait_event FROM
> pg_stat_activity WHERE wait_event i
>
>
> Last write-ahead log location received, the initial value of
> -   this field being 0
> +   this field being 0; null for the parallel apply worker
>
>   
> 
> BEFORE
> null for the parallel apply worker
> 
> AFTER
> null for parallel apply workers
> 

Changed.

> ~~~
> 
> 7.
> 
> @@ -3221,7 +3232,8 @@ SELECT pid, wait_event_type, wait_event FROM
> pg_stat_activity WHERE wait_event i
> last_msg_send_time timestamp
> with time zone
>
>
> -   Send time of last message received from origin WAL sender
> +   Send time of last message received from origin WAL sender; null for
> the
> +   parallel apply worker
>
>   
> 
> (same as #6)
> 
> BEFORE
> null for the parallel apply worker
> 
> AFTER
> null for parallel apply workers
> 

Changed.

> ~~~

RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-12 Thread houzj.f...@fujitsu.com
On Thursday, January 12, 2023 7:08 PM Amit Kapila  
wrote:
> 
> On Thu, Jan 12, 2023 at 4:21 PM shveta malik  wrote:
> >
> > On Thu, Jan 12, 2023 at 10:34 AM Amit Kapila 
> wrote:
> > >
> > > On Thu, Jan 12, 2023 at 9:54 AM Peter Smith 
> wrote:
> > > >
> > > >
> > > > doc/src/sgml/monitoring.sgml
> > > >
> > > > 5. pg_stat_subscription
> > > >
> > > > @@ -3198,11 +3198,22 @@ SELECT pid, wait_event_type, wait_event
> > > > FROM pg_stat_activity WHERE wait_event i
> > > >
> > > >   
> > > > > > > role="column_definition">
> > > > +   apply_leader_pid
> integer
> > > > +  
> > > > +  
> > > > +   Process ID of the leader apply worker, if this process is a 
> > > > apply
> > > > +   parallel worker. NULL if this process is a leader apply worker 
> > > > or a
> > > > +   synchronization worker.
> > > > +  
> > > > + 
> > > > +
> > > > + 
> > > > +   > > > + role="column_definition">
> > > > relid oid
> > > >
> > > >
> > > > OID of the relation that the worker is synchronizing; null for 
> > > > the
> > > > -   main apply worker
> > > > +   main apply worker and the parallel apply worker
> > > >
> > > >   
> > > >
> > > > 5a.
> > > >
> > > > (Same as general comment #1 about terminology)
> > > >
> > > > "apply_leader_pid" --> "leader_apply_pid"
> > > >
> > >
> > > How about naming this as just leader_pid? I think it could be
> > > helpful in the future if we decide to parallelize initial sync (aka
> > > parallel
> > > copy) because then we could use this for the leader PID of parallel
> > > sync workers as well.
> > >
> > > --
> >
> > I still prefer leader_apply_pid.
> > leader_pid does not tell which 'operation' it belongs to. 'apply'
> > gives the clarity that it is apply related process.
> >
> 
> But then do you suggest that tomorrow if we allow parallel sync workers then
> we have a separate column leader_sync_pid? I think that doesn't sound like a
> good idea and moreover one can refer to docs for clarification.

I agree that leader_pid would be better not only for future parallel copy sync 
feature,
but also it's more consistent with the leader_pid column in pg_stat_activity.

And here is the version patch which addressed Peter's comments and renamed all
the related stuff to leader_pid.

Best Regards,
Hou zj


v79-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
Description:  v79-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch


v79-0001-Add-leader_pid-to-pg_stat_subscription.patch
Description: v79-0001-Add-leader_pid-to-pg_stat_subscription.patch


v79-0002-Stop-extra-worker-if-GUC-was-changed.patch
Description: v79-0002-Stop-extra-worker-if-GUC-was-changed.patch


v79-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch
Description:  v79-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-10 Thread houzj.f...@fujitsu.com
On Tuesday, January 10, 2023 7:48 PM Dilip Kumar  wrote:
> 
> On Tue, Jan 10, 2023 at 10:26 AM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Monday, January 9, 2023 4:51 PM Amit Kapila 
> wrote:
> > >
> > > On Sun, Jan 8, 2023 at 11:32 AM houzj.f...@fujitsu.com
> > >  wrote:
> > > >
> > > > On Sunday, January 8, 2023 11:59 AM houzj.f...@fujitsu.com
> > >  wrote:
> > > > > Attach the updated patch set.
> > > >
> > > > Sorry, the commit message of 0001 was accidentally deleted, just
> > > > attach the same patch set again with commit message.
> > > >
> > >
> > > Pushed the first (0001) patch.
> >
> > Thanks for pushing, here are the remaining patches.
> > I reordered the patch number to put patches that are easier to commit
> > in the front of others.
> 
> I was looking into 0001, IMHO the pid should continue to represent the main
> apply worker. So the pid will always show the main apply worker which is
> actually receiving all the changes for the subscription (in short working as
> logical receiver) and if it is applying changes through a parallel worker 
> then it
> should put the parallel worker pid in a new column called 
> 'parallel_worker_pid'
> or 'parallel_apply_worker_pid' otherwise NULL.  Thoughts?

Thanks for the comment.

IIRC, you mean something like following, right ?
(sorry if I misunderstood)
--
For parallel apply worker:
'pid' column shows the pid of the leader, new column parallel_worker_pid shows 
its own pid

For leader apply worker:
'pid' column shows its own pid, new column parallel_worker_pid shows 0
--

If so, I am not sure if the above is better, because it is changing the
existing column's('pid') meaning, the 'pid' will no longer represent the pid of
the worker itself. Besides, it seems not consistent with what we have for
parallel query workers in pg_stat_activity. What do you think ?

Best regards,
Hou zj




RE: releasing ParallelApplyTxnHash when pa_launch_parallel_worker returns NULL

2023-01-10 Thread houzj.f...@fujitsu.com
On Wednesday, January 11, 2023 10:21 AM Ted Yu  wrote:
> /* First time through, initialize parallel apply worker state 
> hashtable. */
> if (!ParallelApplyTxnHash)
> 
> I think it would be better if `ParallelApplyTxnHash` is created by the first
> successful parallel apply worker.

Thanks for the suggestion. But I am not sure if it's worth to changing the
order here, because It will only optimize the case where user enable parallel
apply but never get an available worker which should be rare. And in such a
case, it'd be better to increase the number of workers or disable the parallel 
mode.

Best Regards,
Hou zj


RE: releasing ParallelApplyTxnHash when pa_launch_parallel_worker returns NULL

2023-01-10 Thread houzj.f...@fujitsu.com
On Wednesday, January 11, 2023 1:25 AM Ted Yu  wrote:

> I was reading src/backend/replication/logical/applyparallelworker.c .
> In `pa_allocate_worker`, when pa_launch_parallel_worker returns NULL, I think 
> the `ParallelApplyTxnHash` should be released.

Thanks for reporting.

ParallelApplyTxnHash is used to cache the state of streaming transactions being
applied. There could be multiple streaming transactions being applied in
parallel and their information were already saved in ParallelApplyTxnHash, so
we should not release them just because we don't have a worker available to
handle a new transaction here.

Best Regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-09 Thread houzj.f...@fujitsu.com
On Monday, January 9, 2023 4:51 PM Amit Kapila  wrote:
> 
> On Sun, Jan 8, 2023 at 11:32 AM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Sunday, January 8, 2023 11:59 AM houzj.f...@fujitsu.com
>  wrote:
> > > Attach the updated patch set.
> >
> > Sorry, the commit message of 0001 was accidentally deleted, just
> > attach the same patch set again with commit message.
> >
> 
> Pushed the first (0001) patch.

Thanks for pushing, here are the remaining patches.
I reordered the patch number to put patches that are easier to
commit in the front of others.

Best regards,
Hou zj



v78-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch
Description:  v78-0004-Retry-to-apply-streaming-xact-only-in-apply-work.patch


v78-0001-Add-a-main_worker_pid-to-pg_stat_subscription.patch
Description:  v78-0001-Add-a-main_worker_pid-to-pg_stat_subscription.patch


v78-0002-Stop-extra-worker-if-GUC-was-changed.patch
Description: v78-0002-Stop-extra-worker-if-GUC-was-changed.patch


v78-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch
Description:  v78-0003-Add-GUC-stream_serialize_threshold-and-test-seri.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-09 Thread houzj.f...@fujitsu.com
On Monday, January 9, 2023 5:32 PM Shinoda, Noriyoshi (PN Japan FSIP) 
 wrote:
> 
> Hi, Thanks for the great new feature.
> 
> Applied patches include adding wait events LogicalParallelApplyMain,
> LogicalParallelApplyStateChange.
> However, it seems that monitoring.sgml only contains descriptions for
> pg_locks. The attached patch adds relevant wait event information.
> Please update if you have a better description.

Thanks for reporting. I think for LogicalParallelApplyStateChange we'd better
document it in a consistent style with LogicalSyncStateChange, so I have
slightly adjusted the patch for the same.

Best regards,
Hou zj



v2-0001-document-the-newly-added-wait-event.patch
Description: v2-0001-document-the-newly-added-wait-event.patch


RE: Notify downstream to discard the streamed transaction which was aborted due to crash.

2023-01-05 Thread houzj.f...@fujitsu.com
On Friday, January 6, 2023 1:15 PM Amit Kapila  wrote:
> 
> On Fri, Jan 6, 2023 at 9:25 AM houzj.f...@fujitsu.com 
> wrote:
> >
> >
> > To fix it, One idea is to send a stream abort message when we are
> > cleaning up crashed transaction on publisher(e.g. in
> > ReorderBufferAbortOld()). And here is a tiny patch which changes the
> > same. I have confirmed that the bug is fixed and all regression tests
> > pass. I didn't add a testcase because we need to make sure the crash
> > happens before all the WAL logged transactions data are decoded which
> doesn't seem easy to write a stable test for this.
> >
> 
> Your fix looks good to me. Have you tried this in PG-14 where it was
> introduced?

Yes, I have confirmed that PG-14 has the same problem and can be fixed after
applying the patch.

Best regards,
Hou zj


Notify downstream to discard the streamed transaction which was aborted due to crash.

2023-01-05 Thread houzj.f...@fujitsu.com
Hi,

When developing another feature, I find an existing bug which was reported from 
Dilip[1].

Currently, it's possible that we only send a streaming block without sending a
end of stream message(stream abort) when decoding and streaming a transaction
that was aborted due to crash because we might not WAL log a XLOG_XACT_ABORT
for such a crashed transaction. This will cause the subscriber(with
streaming=on) create a stream file but won't delete it until the apply
worker restart.

BUG repro(borrowed from Dilip):
---
1. start 2 servers(config: logical_decoding_work_mem=64kB)
./pg_ctl -D data/ -c -l pub_logs start
./pg_ctl -D data1/ -c -l sub_logs start

2. Publisher:
create table t(a int PRIMARY KEY ,b text);
create publication test_pub for table t
with(PUBLISH='insert,delete,update,truncate');
alter table t replica identity FULL ;

3. Subscription Server:
create table t(a int,b text);
create subscription test_sub CONNECTION 'host=localhost port=1
dbname=postgres' PUBLICATION test_pub WITH ( slot_name =
test_slot_sub1,streaming=on);

4. Publication Server:
begin ;
insert into t values (generate_series(1,5), 'z');  -- (while 
executing this restart publisher in 2-3 secs)

Restart the publication server, while the transaction is still in an
uncommitted state.
./pg_ctl -D data/ -c -l pub_logs restart -mi
---

After restarting the publisher, we can see the subscriber receive a streaming
block and create a stream file(/base/pgsql_tmp/xxx.fileset).

To fix it, One idea is to send a stream abort message when we are cleaning up
crashed transaction on publisher(e.g. in ReorderBufferAbortOld()). And here is
a tiny patch which changes the same. I have confirmed that the bug is fixed and
all regression tests pass. I didn't add a testcase because we need to make sure
the crash happens before all the WAL logged transactions data are decoded which
doesn't seem easy to write a stable test for this.

Thoughts ?

[1] 
https://www.postgresql.org/message-id/CAFiTN-sTYk%3Dh75Jn1a7ee%2B5hOcdQFjKGDvF_0NWQQXmoBv4A%2BA%40mail.gmail.com

Best regards,
Hou zj


0001-fix-stream-changes-for-crashed-transaction.patch
Description: 0001-fix-stream-changes-for-crashed-transaction.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2023-01-05 Thread houzj.f...@fujitsu.com
On Thursday, January 5, 2023 4:22 PM Dilip Kumar  wrote:
> 
> On Thu, Jan 5, 2023 at 9:07 AM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Wednesday, January 4, 2023 9:29 PM Dilip Kumar
>  wrote:
> 
> > > I think this looks good to me.
> >
> > Thanks for the comments.
> > Attach the new version patch set which changed the comments as
> suggested.
> 
> Thanks for the updated patch, while testing this I see one strange
> behavior which seems like bug to me, here is the step to reproduce
> 
> 1. start 2 servers(config: logical_decoding_work_mem=64kB)
> ./pg_ctl -D data/ -c -l pub_logs start
> ./pg_ctl -D data1/ -c -l sub_logs start
> 
> 2. Publisher:
> create table t(a int PRIMARY KEY ,b text);
> CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS
> 'select array_agg(md5(g::text))::text from generate_series(1, 256) g';
> create publication test_pub for table t
> with(PUBLISH='insert,delete,update,truncate');
> alter table t replica identity FULL ;
> insert into t values (generate_series(1,2000),large_val()) ON CONFLICT
> (a) DO UPDATE SET a=EXCLUDED.a*300;
> 
> 3. Subscription Server:
> create table t(a int,b text);
> create subscription test_sub CONNECTION 'host=localhost port=5432
> dbname=postgres' PUBLICATION test_pub WITH ( slot_name =
> test_slot_sub1,streaming=parallel);
> 
> 4. Publication Server:
> begin ;
> savepoint a;
> delete from t;
> savepoint b;
> insert into t values (generate_series(1,5000),large_val()) ON CONFLICT
> (a) DO UPDATE SET a=EXCLUDED.a*3;  -- (while executing this start
> publisher in 2-3 secs)
> 
> Restart the publication server, while the transaction is still in an
> uncommitted state.
> ./pg_ctl -D data/ -c -l pub_logs stop -mi
> ./pg_ctl -D data/ -c -l pub_logs start -mi
> 
> after this, the parallel apply worker stuck in waiting on stream lock
> forever (even after 10 mins) -- see below, from subscriber logs I can
> see one of the parallel apply worker [75677] started but never
> finished [no error], after that I have performed more operation [same
> insert] which got applied by new parallel apply worked which got
> started and finished within 1 second.
> 

Thanks for reporting the problem.

After analyzing the behavior, I think it's a bug on publisher side which
is not directly related to parallel apply.

I think the root reason is that we didn't try to send a stream end(stream
abort) message to subscriber for the crashed transaction which was streamed
before.

The behavior is that, after restarting, the publisher will start to decode the
transaction that aborted due to crash, and when try to stream the first change
of that transaction, it will send a stream start message but then it realizes
that the transaction was aborted, so it will enter the PG_CATCH block of
ReorderBufferProcessTXN() and call ReorderBufferResetTXN() which send the
stream stop message. And in this case, there would be a parallel apply worker
started on subscriber waiting for stream end message which will never come.

I think the same behavior happens for the non-parallel mode which will cause
a stream file left on subscriber and will not be cleaned until the apply worker 
is
restarted.

To fix it, I think we need to send a stream abort message when we are cleaning
up crashed transaction on publisher(e.g., in ReorderBufferAbortOld()). And here
is a tiny patch which change the same. I have confirmed that the bug is fixed
and all regression tests pass.

What do you think ?
I will start a new thread and try to write a testcase if possible
after reaching a consensus.

Best regards,
Hou zj


0001-fix-stream-changes-for-crashed-transaction.patch
Description: 0001-fix-stream-changes-for-crashed-transaction.patch


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-20 Thread houzj.f...@fujitsu.com

On Monday, December 19, 2022 8:47 PMs Amit Kapila :
> 
> On Sat, Dec 17, 2022 at 7:34 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > Agreed. I have addressed all the comments and did some cosmetic changes.
> > Attach the new version patch set.
> >
> 
> Few comments:
> 
> 1.
> + if (fileset_state == FS_SERIALIZE_IN_PROGRESS) {
> + pa_lock_stream(MyParallelShared->xid, AccessShareLock);
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
> +
> + /*
> + * We cannot read the file immediately after the leader has serialized
> + all
> + * changes to the file because there may still be messages in the
> + memory
> + * queue. We will apply all spooled messages the next time we call this
> + * function, which should ensure that there are no messages left in the
> + * memory queue.
> + */
> + else if (fileset_state == FS_SERIALIZE_DONE) {
> 
> Once we have waited in the FS_SERIALIZE_IN_PROGRESS, the file state can be
> FS_SERIALIZE_DONE immediately after that. So, won't it be better to have a
> separate if block for FS_SERIALIZE_DONE state? If you agree to do so then we
> can probably remove the comment: "* XXX It is possible that immediately after
> we have waited for a lock in ...".

Changed and slightly adjust the comments.

> 2.
> +void
> +pa_decr_and_wait_stream_block(void)
> +{
> + Assert(am_parallel_apply_worker());
> +
> + if (pg_atomic_sub_fetch_u32(>pending_stream_count,
> + 1) == 0)
> 
> I think here the count can go negative when we are in serialize mode because
> we don't increase it for serialize mode. I can't see any problem due to that 
> but
> OTOH, this doesn't seem to be intended because in the future if we decide to
> implement the functionality of switching back to non-serialize mode, this 
> could
> be a problem. Also, I guess we don't even need to try locking/unlocking the
> stream lock in that case.
> One idea to avoid this is to check if the pending count is zero then if 
> file_set in
> not available raise an error (elog ERROR), otherwise, simply return from here.

Added the check.

> 
> 3. In apply_handle_stream_stop(), we are setting backendstate as idle for 
> cases
> TRANS_LEADER_SEND_TO_PARALLEL and TRANS_PARALLEL_APPLY. For other
> cases, it is set by stream_stop_internal. I think it would be better to set 
> the state
> explicitly for all cases to make the code look consistent and remove it from
> stream_stop_internal(). The other reason to remove setting the state from
> stream_stop_internal() is that when that function is invoked from other places
> like apply_handle_stream_commit(), it seems to be setting the idle before
> actually we reach the idle state.

Changed. Besides, I notice that the pgstat_report_activity in pa_stream_abort
for sub transaction is unnecessary since the state should be consistent with the
state set at last stream_stop, so I have removed that as well.

> 
> 4. Apart from the above, I have made a few changes in the comments, see
> attached.

Thanks, I have merged the patch.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-16 Thread houzj.f...@fujitsu.com
On Friday, December 16, 2022 3:08 PM Masahiko Sawada  
wrote:
> 
> 
>Here are some minor comments:

Thanks for the comments!

> ---
> +pa_has_spooled_message_pending()
> +{
> +   PartialFileSetState fileset_state;
> +
> +   fileset_state = pa_get_fileset_state();
> +
> +   if (fileset_state != FS_UNKNOWN)
> +   return true;
> +   else
> +   return false;
> +}
> 
> I think we can simply do:
> 
> return (fileset_state != FS_UNKNOWN);

Will change.

> 
> Or do we need this function in the first place? I think we can do in
> LogicalParallelApplyLoop() like:

I was intended to not expose the file state in the main loop, so maybe better
to keep this function.

> ---
> +   active_workers = list_copy(ParallelApplyWorkerPool);
> +
> +   foreach(lc, active_workers)
> +   {
> +   int slot_no;
> +   uint16  generation;
> +   ParallelApplyWorkerInfo *winfo =
> (ParallelApplyWorkerInfo *) lfirst(lc);
> +
> +   LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +   napplyworkers =
> logicalrep_pa_worker_count(MyLogicalRepWorker->subid);
> +   LWLockRelease(LogicalRepWorkerLock);
> +
> +   if (napplyworkers <=
> max_parallel_apply_workers_per_subscription / 2)
> +   return;
> +
> 
> Calling logicalrep_pa_worker_count() with lwlock for each worker seems
> not efficient to me. I think we can get the number of workers once at
> the top of this function and return if it's already lower than the
> maximum pool size. Otherwise, we attempt to stop extra workers.

How about we directly check the length of worker pool list here which
seems simpler and don't need to lock ?

> ---
> +bool
> +pa_free_worker(ParallelApplyWorkerInfo *winfo, TransactionId xid)
> +{
> 
> 
> Is there any reason why this function has the XID as a separate
> argument? It seems to me that since we always call this function with
> 'winfo' and 'winfo->shared->xid', we can remove xid from the function
> argument.
> 
> ---
> +   /* Initialize shared memory area. */
> +   SpinLockAcquire(>shared->mutex);
> +   winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
> +   winfo->shared->xid = xid;
> +   SpinLockRelease(>shared->mutex);
> 
> It's practically no problem but is there any reason why some fields of
> ParallelApplyWorkerInfo are initialized in pa_setup_dsm() whereas some
> fields are done here?

We could be using old worker in the pool here in which case we need to update
these fields with the new streaming transaction information.

I will address other comments except above ones which are being discussed.

Best regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-13 Thread houzj.f...@fujitsu.com
On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada  
wrote:
> 
> On Sun, Dec 11, 2022 at 8:45 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Friday, December 9, 2022 3:14 PM Amit Kapila
>  wrote:
> > >
> > > On Thu, Dec 8, 2022 at 12:37 PM houzj.f...@fujitsu.com
> > >  wrote:
> > > >
> > >
> > > Review comments
> >
> > Thanks for the comments!
> >
> > > ==
> > > 1. Currently, we don't release the stream lock in LA (leade apply
> > > worker) for "rollback to savepoint" and the reason is mentioned in
> > > comments of
> > > apply_handle_stream_abort() in the patch. But, today, while testing,
> > > I found that can lead to deadlock which otherwise, won't happen on
> > > the publisher. The key point is rollback to savepoint releases the
> > > locks acquired by the particular subtransaction, so parallel apply
> > > worker should also do the same. Consider the following example where
> > > the transaction in session-1 is being performed by the parallel
> > > apply worker and the transaction in session-2 is being performed by the
> leader apply worker. I have simulated it by using GUC force_stream_mode.
> > > Publisher
> > > ==
> > > Session-1
> > > postgres=# begin;
> > > BEGIN
> > > postgres=*# savepoint s1;
> > > SAVEPOINT
> > > postgres=*# truncate t1;
> > > TRUNCATE TABLE
> > >
> > > Session-2
> > > postgres=# begin;
> > > BEGIN
> > > postgres=*# insert into t1 values(4);
> > >
> > > Session-1
> > > postgres=*# rollback to savepoint s1; ROLLBACK
> > >
> > > Session-2
> > > Commit;
> > >
> > > With or without commit of Session-2, this scenario will lead to
> > > deadlock on the subscriber because PA (parallel apply worker) is
> > > waiting for LA to send the next command, and LA is blocked by
> > > Exclusive of PA. There is no deadlock on the publisher because
> > > rollback to savepoint will release the lock acquired by truncate.
> > >
> > > To solve this, How about if we do three things before sending abort
> > > of sub-transaction (a) unlock the stream lock, (b) increment
> > > pending_stream_count,
> > > (c) take the stream lock again?
> > >
> > > Now, if the PA is not already waiting on the stop, it will not wait
> > > at stream_stop but will wait after applying abort of sub-transaction
> > > and if it is already waiting at stream_stop, the wait will be
> > > released. If this works then probably we should try to do (b) before (a) 
> > > to
> match the steps with stream_start.
> >
> > The solution works for me, I have changed the code as suggested.
> >
> >
> > > 2. There seems to be another general problem in the way the patch
> > > waits for stream_stop in PA (parallel apply worker). Currently, PA
> > > checks, if there are no more pending streams then it tries to wait
> > > for the next stream by waiting on a stream lock. However, it is
> > > possible after PA checks there is no pending stream and before it
> > > actually starts waiting on a lock, the LA sends another stream for
> > > which even stream_stop is sent, in this case, PA will start waiting
> > > for the next stream whereas there is actually a pending stream
> > > available. In this case, it won't lead to any problem apart from
> > > delay in applying the changes in such cases but for the case mentioned in
> the previous point (Pont 1), it can lead to deadlock even after we implement 
> the
> solution proposed to solve it.
> >
> > Thanks for reporting, I have introduced another flag in shared memory
> > and use it to prevent the leader from incrementing the
> > pending_stream_count if the parallel apply worker is trying to lock the 
> > stream
> lock.
> >
> >
> > > 3. The other point to consider is that for
> > > stream_commit/prepare/abort, in LA, we release the stream lock after
> > > sending the message whereas for stream_start we release it before
> > > sending the message. I think for the earlier cases
> > > (stream_commit/prepare/abort), the patch has done like this because
> > > pa_send_data() may need to require the lock again when it times out
> > > and start serializing, so there will be no sense in first releasing
> > > it, then re-acquiring it, and then again releasing it. Can't we also
> > > release the lock f

RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-13 Thread houzj.f...@fujitsu.com
On Tue, Dec 13, 2022 7:06 AM Peter Smith  wrote:
> Some minor review comments for v58-0001

Thanks for your comments.

> ==
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. pa_can_start
> 
> + /*
> + * Don't start a new parallel worker if user has set skiplsn as it's
> + * possible that user want to skip the streaming transaction. For 
> + streaming
> + * transaction, we need to serialize the transaction to a file so 
> + that we
> + * can get the last LSN of the transaction to judge whether to skip 
> + before
> + * starting to apply the change.
> + */
> + if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
> + return false;
> 
> 
> "that user want" -> "that they want"
> 
> "For streaming transaction," -> "For streaming transactions,"

Changed.

> ~~~
> 
> 2. pa_free_worker_info
> 
> + /* Remove from the worker pool. */
> + ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool,
> +winfo);
> 
> Unnecessary wrapping

Changed.

> ~~~
> 
> 3. pa_set_stream_apply_worker
> 
> +/*
> + * Set the worker that required to apply the current streaming transaction.
> + */
> +void
> +pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo) {  
> +stream_apply_worker = winfo; }
> 
> Comment wording seems wrong.

Tried to improve this comment.

> ==
> 
> src/include/replication/worker_internal.h
> 
> 4. ParallelApplyWorkerShared
> 
> + * XactLastCommitEnd from the parallel apply worker. This is required 
> +to
> + * update the lsn_mappings by leader worker.
> + */
> + XLogRecPtr last_commit_end;
> +} ParallelApplyWorkerShared;
> 
> 
> "This is required to update the lsn_mappings by leader worker." --> 
> did you mean "This is required by the leader worker so it can update 
> the lsn_mappings." ??

Changed.

Also thanks for the kind reminder in [1], rebased the patch set.
Attach the new patch set.

[1] - 
https://www.postgresql.org/message-id/CAHut%2BPt4qv7xfJUmwdn6Vy47L5mqzKtkPr31%3DDmEayJWXetvYg%40mail.gmail.com

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-07 Thread houzj.f...@fujitsu.com
On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada  
wrote:
> 
> On Mon, Dec 5, 2022 at 1:29 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Sunday, December 4, 2022 7:17 PM houzj.f...@fujitsu.com
> 
> > >
> > > Thursday, December 1, 2022 8:40 PM Amit Kapila
> 
> > > wrote:
> > > > Some other comments:
> > > ...
> > > Attach the new version patch set which addressed most of the comments
> > > received so far except some comments being discussed[1].
> > > [1]
> https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C
> 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> >
> > Attach a new version patch set which fixed a testcase failure on CFbot.
> 
> ---
> If a value of max_parallel_apply_workers_per_subscription is not
> sufficient, we get the LOG "out of parallel apply workers" every time
> when the apply worker doesn't launch a worker. But do we really need
> this log? It seems not consistent with
> max_sync_workers_per_subscription behavior. I think we can check if
> the number of running parallel workers is less than
> max_parallel_apply_workers_per_subscription before calling
> logicalrep_worker_launch(). What do you think?

(Sorry, I missed this comment in last email)

I personally feel giving a hint might help user to realize that the
max_parallel_applyxxx is not enough for the current workload and then they can
adjust the parameter. Otherwise, user might have an easy way to check if more
workers are needed.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-07 Thread houzj.f...@fujitsu.com
On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada  
wrote:
> 
> On Mon, Dec 5, 2022 at 1:29 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Sunday, December 4, 2022 7:17 PM houzj.f...@fujitsu.com
> 
> > >
> > > Thursday, December 1, 2022 8:40 PM Amit Kapila
> 
> > > wrote:
> > > > Some other comments:
> > > ...
> > > Attach the new version patch set which addressed most of the comments
> > > received so far except some comments being discussed[1].
> > > [1]
> https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C
> 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> >
> > Attach a new version patch set which fixed a testcase failure on CFbot.
> 
> Here are some comments on v56 0001, 0002 patches. Please ignore
> comments if you already incorporated them in v57.

Thanks for the comments!

> +static void
> +ProcessParallelApplyInterrupts(void)
> +{
> +CHECK_FOR_INTERRUPTS();
> +
> +if (ShutdownRequestPending)
> +{
> +ereport(LOG,
> +(errmsg("logical replication parallel
> apply worker for subscrip
> tion \"%s\" has finished",
> +MySubscription->name)));
> +
> +apply_worker_clean_exit(false);
> +}
> +
> +if (ConfigReloadPending)
> +{
> +ConfigReloadPending = false;
> +ProcessConfigFile(PGC_SIGHUP);
> +}
> +}
> 
> I personally think that we don't need to have a function to do only
> these few things.

I thought that introduce a new function make the handling of worker specific
Interrupts logic similar to other existing ones. Like:
ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
pgarch.c ...

> 
> Should we change the names to something like
> LOGICALREP_STREAM_PARALLEL?

Agreed, will change.

> ---
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
> + * acquire the lock on the remote transaction) -> LA
> 
> and
> 
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire 
> the
> + * lock due to unique index constraint) -> PA-1 (waiting to acquire the 
> stream
> + * lock) -> LA
> 
> "(waiting to acquire the lock on the remote transaction)" in the first
> example and "(waiting to acquire the stream lock)" in the second
> example is the same meaning, right? If so, I think we should use
> either term for consistency.

Will change.

> ---
> +bool   write_abort_info = (data->streaming ==
> SUBSTREAM_PARALLEL);
> 
> I think that instead of setting write_abort_info every time when
> pgoutput_stream_abort() is called, we can set it once, probably in
> PGOutputData, at startup.

I thought that since we already have a "stream" flag in PGOutputData, I am not
sure if it would be better to introduce another flag for the same option.


> ---
> server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> options.proto.logical.proto_version =
> +server_version >= 16 ?
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> server_version >= 15 ?
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> server_version >= 14 ?
> LOGICALREP_PROTO_STREAM_VERSION_NUM :
> LOGICALREP_PROTO_VERSION_NUM;
> 
> Instead of always using the new protocol version, I think we can use
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> 'parallel'. That way, we don't need to change protocl version check
> logic in pgoutput.c and don't need to expose defGetStreamingMode().
> What do you think?

I think that some user can also use the new version number when trying to get
changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
leave the check for new version number seems fine.

Besides, I feel even if we don't use new version number, we still need to use
defGetStreamingMode to check if parallel mode in used as we need to send
abort_lsn when parallel is in used. I might be missing something, sorry for
that. Can you please explain the idea a bit ?

> ---
> When max_parallel_apply_workers_per_subscription is changed to a value
> lower than the number of parallel worker running at that time, do we
> need to stop extra workers?

I think we can do this, like adding a check in the main loop of leader worker, 
and
check every time after reloading the conf. OTOH, we will also stop the worker 
after
finishing a transac

RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-06 Thread houzj.f...@fujitsu.com
On Tue, Dec 6, 2022 7:57 AM Peter Smith  wrote:
> Here are my review comments for patch v55-0002

Thansk for your comments.

> ==
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. pa_can_start
> 
> @@ -276,9 +278,9 @@ pa_can_start(TransactionId xid)
>   /*
>   * Don't start a new parallel worker if user has set skiplsn as it's
>   * possible that user want to skip the streaming transaction. For
> - * streaming transaction, we need to spill the transaction to disk so 
> that
> - * we can get the last LSN of the transaction to judge whether to 
> skip
> - * before starting to apply the change.
> + * streaming transaction, we need to serialize the transaction to a 
> + file
> + * so that we can get the last LSN of the transaction to judge 
> + whether to
> + * skip before starting to apply the change.
>   */
>   if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
>   return false;
> 
> I think the wording change may belong in patch 0001 because it has 
> nothing to do with partial serializing.

Changed.

> ~~~
> 
> 2. pa_free_worker
> 
> + /*
> + * Stop the worker if there are enough workers in the pool.
> + *
> + * XXX The worker is also stopped if the leader apply worker needed 
> + to
> + * serialize part of the transaction data due to a send timeout. This 
> + is
> + * because the message could be partially written to the queue due to 
> + send
> + * timeout and there is no way to clean the queue other than 
> + resending the
> + * message until it succeeds. To avoid complexity, we directly stop 
> + the
> + * worker in this case.
> + */
> + if (winfo->serialize_changes ||
> + napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> 
> Don't need to say "due to send timeout" 2 times in 2 sentences.
> 
> SUGGESTION
> XXX The worker is also stopped if the leader apply worker needed to 
> serialize part of the transaction data due to a send timeout. This is 
> because the message could be partially written to the queue but there 
> is no way to clean the queue other than resending the message until it 
> succeeds. Directly stopping the worker avoids needing this complexity.

Changed.

> 4.
> 
>  /*
> + * Replay the spooled messages in the parallel apply worker if the 
> +leader apply
> + * worker has finished serializing changes to the file.
> + */
> +static void
> +pa_spooled_messages(void)
> 
> I'm not 100% sure of the logic, so IMO maybe the comment should say a 
> bit more about how this works:
> 
> Specifically, let's say there was some timeout and the LA needed to 
> write the spool file, then let's say the PA timed out and found itself 
> inside this function. Now, let's say the LA is still busy writing the 
> file -- so what happens next?
> 
> Does this function simply return, then the main PA loop waits again, 
> then the times out again, then PA finds itself back inside this 
> function again... and that keeps happening over and over until 
> eventually the spool file is found FS_READY? Some explanatory comments 
> might help.

Slightly changed the logic and comment here.

> ~
> 
> 5.
> 
> + /*
> + * Check if changes have been serialized to a file. if so, read and 
> + apply
> + * them.
> + */
> + SpinLockAcquire(>mutex);
> + fileset_state = MyParallelShared->fileset_state; 
> + SpinLockRelease(>mutex);
> 
> "if so" -> "If so"

Changed.

> ~~~
> 
> 
> 6. pa_send_data
> 
> + *
> + * If the attempt to send data via shared memory times out, then we 
> + will
> switch
> + * to "PARTIAL_SERIALIZE mode" for the current transaction to prevent
> possible
> + * deadlocks with another parallel apply worker (refer to the 
> + comments atop
> + * applyparallelworker.c for details). This means that the current 
> + data and any
> + * subsequent data for this transaction will be serialized to a file.
>   */
>  void
>  pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void 
> *data)
> 
> SUGGESTION (minor comment rearranging)
> 
> If the attempt to send data via shared memory times out, then we will 
> switch to "PARTIAL_SERIALIZE mode" for the current transaction -- this 
> means that the current data and any subsequent data for this 
> transaction will be serialized to a file. This is done to prevent 
> possible deadlocks with another parallel apply worker (refer to the 
> comments atop applyparallelworker.c for details).

Changed.

> ~
> 
> 7.
> 
> + /*
> + * Take the stream lock to make sure that the parallel apply worker
> + * will wait for the leader to release the stream lock until the
> + * end of the transaction.
> + */
> + pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
> 
> The comment doesn't sound right.
> 
> "until the end" -> "at the end" (??)

I think it means "PA wait ... until the end of transaction".

> ~~~
> 
> 8. pa_stream_abort
> 
> @@ -1374,6 +1470,7 @@ pa_stream_abort(LogicalRepStreamAbortData
> *abort_data)
>   RollbackToSavepoint(spname);
>   CommitTransactionCommand();
>   subxactlist = list_truncate(subxactlist, i + 1);
> +
>   break;
>   

RE: Avoid streaming the transaction which are skipped (in corner cases)

2022-12-04 Thread houzj.f...@fujitsu.com
On Saturday, December 3, 2022 7:37 PM Amit Kapila  
wrote:
> 
> On Tue, Nov 29, 2022 at 12:23 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Tuesday, November 29, 2022 12:08 PM Dilip Kumar
>  wrote:
> >
> > I have few comments about the patch.
> >
> > 1.
> >
> > 1.1.
> > -   /* For streamed transactions notify the remote node about the abort.
> */
> > -   if (rbtxn_is_streamed(txn))
> > -   rb->stream_abort(rb, txn, lsn);
> > +   /* the transaction which is being skipped shouldn't have been
> streamed */
> > +   Assert(!rbtxn_is_streamed(txn));
> >
> > 1.2
> > -   rbtxn_is_serialized(txn))
> > +   rbtxn_is_serialized(txn) &&
> > +   rbtxn_has_streamable_change(txn))
> > ReorderBufferStreamTXN(rb, toptxn);
> >
> > In the above two places, I think we should do the check for the
> > top-level transaction(e.g. toptxn) because the patch only set flag for
> > the top-level transaction.
> >
> 
> Among these, the first one seems okay because it will check both the 
> transaction
> and its subtransactions from that path and none of those should be marked as
> streamed. I have fixed the second one in the attached patch.
> 
> > 2.
> >
> > +   /*
> > +* If there are any streamable changes getting queued then get the
> top
> > +* transaction and mark it has streamable change.  This is required
> for
> > +* streaming in-progress transactions, the in-progress transaction 
> > will
> > +* not be selected for streaming unless it has at least one 
> > streamable
> > +* change.
> > +*/
> > +   if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
> > +   change->action == REORDER_BUFFER_CHANGE_UPDATE ||
> > +   change->action == REORDER_BUFFER_CHANGE_DELETE ||
> > +   change->action ==
> REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
> > +   change->action ==
> REORDER_BUFFER_CHANGE_TRUNCATE)
> >
> > I think that a transaction that contains REORDER_BUFFER_CHANGE_MESSAGE
> > can also be considered as streamable. Is there a reason that we don't check 
> > it
> here ?
> >
> 
> No, I don't see any reason not to do this check for
> REORDER_BUFFER_CHANGE_MESSAGE.
> 
> Apart from the above, I have slightly adjusted the comments in the attached. 
> Do
> let me know what you think of the attached.

Thanks for updating the patch. It looks good to me.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-04 Thread houzj.f...@fujitsu.com
On Friday, December 2, 2022 4:59 PM Peter Smith  wrote:
> 
> Here are my review comments for patch v54-0001.

Thanks for the comments!

> ==
> 
> FILE: .../replication/logical/applyparallelworker.c
> 
> 1b.
> 
> + *
> + * This file contains routines that are intended to support setting up,
> + using
> + * and tearing down a ParallelApplyWorkerInfo which is required to
> + communicate
> + * among leader and parallel apply workers.
> 
> "that are intended to support" -> "for"

I find the current word is consistent with the comments atop vacuumparallel.c 
and
execParallel.c. So didn't change this one.

> 3. pa_setup_dsm
> 
> +/*
> + * Set up a dynamic shared memory segment.
> + *
> + * We set up a control region that contains a fixed-size worker info
> + * (ParallelApplyWorkerShared), a message queue, and an error queue.
> + *
> + * Returns true on success, false on failure.
> + */
> +static bool
> +pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
> 
> IMO that's confusing to say "fixed-sized worker info" when it's referring to 
> the
> ParallelApplyWorkerShared structure and not the other
> ParallelApplyWorkerInfo.
> 
> Might be better to say:
> 
> "a fixed-size worker info (ParallelApplyWorkerShared)" -> "a fixed-size struct
> (ParallelApplyWorkerShared)"

The ParallelApplyWorkerShared is also kind of information that shared
between workers. So, I am fine with current word. Or maybe just "fixed-size 
info" ?

> ~~~
> 
> 12. pa_clean_subtrans
> 
> +/* Reset the list that maintains subtransactions. */ void
> +pa_clean_subtrans(void)
> +{
> + subxactlist = NIL;
> +}
> 
> Maybe a more informative function name would be pa_reset_subxactlist()?

I thought the current name is more consistent with pa_start_subtrans.

> ~~~
> 
> 17. apply_handle_stream_stop
> 
> + case TRANS_PARALLEL_APPLY:
> + elog(DEBUG1, "applied %u changes in the streaming chunk",
> + parallel_stream_nchanges);
> +
> + /*
> + * By the time parallel apply worker is processing the changes in
> + * the current streaming block, the leader apply worker may have
> + * sent multiple streaming blocks. This can lead to parallel apply
> + * worker start waiting even when there are more chunk of streams
> + * in the queue. So, try to lock only if there is no message left
> + * in the queue. See Locking Considerations atop
> + * applyparallelworker.c.
> + */
> 
> SUGGESTION (minor rewording)
> 
> By the time the parallel apply worker is processing the changes in the current
> streaming block, the leader apply worker may have sent multiple streaming
> blocks. To the parallel apply from waiting unnecessarily, try to lock only if 
> there
> is no message left in the queue. See Locking Considerations atop
> applyparallelworker.c.
> 

Didn't change this one according to Amit's comment.

> 
> 21. apply_worker_clean_exit
> 
> I wasn't sure if calling this a 'clean' exit meant anything much.
> 
> How about:
> - apply_worker_proc_exit, or
> - apply_worker_exit

I thought the clean means the exit number is 0(proc_exit(0)) and is
not due to any ERROR, I am not sure If proc_exit or exit is better.

I have addressed other comments in the new version patch.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-04 Thread houzj.f...@fujitsu.com
On Friday, December 2, 2022 7:27 PM Kuroda, Hayato/黒田 隼人 
 wroteL
> 
> Dear Hou,
> 
> Thanks for making the patch. Followings are my comments for v54-0003 and
> 0004.

Thanks for the comments!

> 
> 0003
> 
> pa_free_worker()
> 
> +   /* Unlink any files that were needed to serialize partial changes. */
> +   if (winfo->serialize_changes)
> +   stream_cleanup_files(MyLogicalRepWorker->subid,
> winfo->shared->xid);
> +
> 
> I think this part is not needed, because the LA cannot reach here if
> winfo->serialize_changes is true. Moreover stream_cleanup_files() is done in
> pa_free_worker_info().

Removed.

> LogicalParallelApplyLoop()
> 
> The parallel apply worker wakes up every 0.1s even if we are in the
> PARTIAL_SERIALIZE mode. Do you have idea to reduce that?

The parallel apply worker usually will wait on the stream lock after entering
PARTIAL_SERIALIZE mode.

> ```
> +   pa_spooled_messages();
> ```
> 
> Comments are needed here, like "Changes may be serialize...".

Added.

> pa_stream_abort()
> 
> ```
> +   /*
> +* Reopen the file and set the file position 
> to
> the saved
> +* position.
> +*/
> +   if (reopen_stream_fd)
> +   {
> +   charpath[MAXPGPATH];
> +
> +   changes_filename(path,
> MyLogicalRepWorker->subid, xid);
> +   stream_fd =
> BufFileOpenFileSet(>fileset,
> +
> path, O_RDONLY, false);
> +   BufFileSeek(stream_fd, fileno, offset,
> SEEK_SET);
> +   }
> ```
> 
> MyParallelShared->serialize_changes may be used instead of reopen_stream_fd.

These codes have been removed.

> 
> ```
> +   /*
> +* It is possible that while sending this change to
> parallel apply
> +* worker we need to switch to serialize mode.
> +*/
> +   if (winfo->serialize_changes)
> +   pa_set_fileset_state(winfo->shared,
> FS_READY);
> ```
> 
> There are three same parts in the code, can we combine them to common part?

These codes have been slightly refactored.

> apply_spooled_messages()
> 
> ```
> +   /*
> +* Break the loop if the parallel apply worker has finished
> applying
> +* the transaction. The parallel apply worker should have 
> closed
> the
> +* file before committing.
> +*/
> +   if (am_parallel_apply_worker() &&
> +   MyParallelShared->xact_state ==
> PARALLEL_TRANS_FINISHED)
> +   goto done;
> ```
> 
> I thnk pfree(buffer) and pfree(s2.data) should not be skippied.
> And this part should be at below "nchanges++;"

buffer, s2.data were allocated in the toplevel transaction's context and it
will be automatically freed soon when handling STREAM COMMIT.

> 
> 0004
> 
> set_subscription_retry()
> 
> ```
> +   LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
> +AccessShareLock);
> +
> ```
> 
> I think AccessExclusiveLock should be aquired instead of AccessShareLock.
> In AlterSubscription(), LockSharedObject(AccessExclusiveLock) seems to be
> used.

Changed.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-12-01 Thread houzj.f...@fujitsu.com
On Thursday, December 1, 2022 3:58 PM Masahiko Sawada  
wrote:
> 
> On Wed, Nov 30, 2022 at 10:51 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Wednesday, November 30, 2022 9:41 PM houzj.f...@fujitsu.com
>  wrote:
> > >
> > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila
> > > > Review comments on v53-0001*
> > >
> > > Attach the new version patch set.
> >
> > Sorry, there were some mistakes in the previous patch set.
> > Here is the correct V54 patch set. I also ran pgindent for the patch set.
> >
> 
> Thank you for updating the patches. Here are random review comments for
> 0001 and 0002 patches.

Thanks for the comments!

> 
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>  errmsg("logical replication parallel apply worker exited
> abnormally"),
>  errcontext("%s", edata.context))); and
> 
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>  errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
> 
> I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate
> here. Given that parallel apply worker has already reported the error message
> with the error code, I think we don't need to set the errorcode for the logs
> from the leader process.
> 
> Also, I'm not sure the term "exited abnormally" is appropriate since we use it
> when the server crashes for example. I think ERRORs reported here don't mean
> that in general.

How about reporting "xxx worker exited due to error" ?

> ---
> if (am_parallel_apply_worker() && on_subinfo_change) {
> /*
>  * If a parallel apply worker exits due to the subscription
>  * information change, we notify the leader apply worker so that the
>  * leader can report more meaningful message in time and restart the
>  * logical replication.
>  */
> pq_putmessage('X', NULL, 0);
> }
> 
> and
> 
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>  errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
> 
> Do we really need an additional message in case of 'X'? When we call
> apply_worker_clean_exit with on_subinfo_change = true, we have reported the
> error message such as:
> 
> ereport(LOG,
> (errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop because of a parameter change",
> MySubscription->name)));
> 
> I think that reporting a similar message from the leader might not be
> meaningful for users.

The intention is to let leader report more meaningful message if a worker
exited due to subinfo change. Otherwise, the leader is likely to report an
error like " lost connection ... to parallel apply worker" when trying to send
data via shared memory if the worker exited. What do you think ?

> ---
> -if (options->proto.logical.streaming &&
> -PQserverVersion(conn->streamConn) >= 14)
> -appendStringInfoString(, ", streaming 'on'");
> +if (options->proto.logical.streaming_str)
> +appendStringInfo(, ", streaming '%s'",
> +
> options->proto.logical.streaming_str);
> 
> and
> 
> +/*
> + * Assign the appropriate option value for streaming option
> according to
> + * the 'streaming' mode and the publisher's ability to
> support that mode.
> + */
> +if (server_version >= 16 &&
> +MySubscription->stream == SUBSTREAM_PARALLEL)
> +{
> +options.proto.logical.streaming_str = pstrdup("parallel");
> +MyLogicalRepWorker->parallel_apply = true;
> +}
> +else if (server_version >= 14 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> +{
> +options.proto.logical.streaming_str = pstrdup("on");
> +MyLogicalRepWorker->parallel_apply = false;
> +}
> +else
> +{
> +options.proto.logical.streaming_str = NULL;
> +MyLogicalRepWorker->parallel_apply = false;
> +}
> 
> This change moves the code of adjustment of the streaming option based on
> the publisher server version from libpqwalreceiver.c to worker.c.
> On the other hand,

RE: Avoid streaming the transaction which are skipped (in corner cases)

2022-11-28 Thread houzj.f...@fujitsu.com
On Tuesday, November 29, 2022 12:08 PM Dilip Kumar  
wrote:

Hi,

> 
> On Mon, Nov 28, 2022 at 3:19 PM Dilip Kumar  wrote:
> >
> > On Mon, Nov 28, 2022 at 1:46 PM shiy.f...@fujitsu.com
> >  wrote:
> > >
> > > Thanks for your patch.
> > >
> > > I saw that the patch added a check when selecting largest
> > > transaction, but in addition to ReorderBufferCheckMemoryLimit(), the
> > > transaction can also be streamed in
> > > ReorderBufferProcessPartialChange(). Should we add the check in this
> function, too?
> > >
> > > diff --git a/src/backend/replication/logical/reorderbuffer.c
> > > b/src/backend/replication/logical/reorderbuffer.c
> > > index 9a58c4bfb9..108737b02f 100644
> > > --- a/src/backend/replication/logical/reorderbuffer.c
> > > +++ b/src/backend/replication/logical/reorderbuffer.c
> > > @@ -768,7 +768,8 @@
> ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN
> *txn,
> > >  */
> > > if (ReorderBufferCanStartStreaming(rb) &&
> > > !(rbtxn_has_partial_change(toptxn)) &&
> > > -   rbtxn_is_serialized(txn))
> > > +   rbtxn_is_serialized(txn) &&
> > > +   rbtxn_has_streamable_change(txn))
> > > ReorderBufferStreamTXN(rb, toptxn);  }
> >
> > You are right we need this in ReorderBufferProcessPartialChange() as
> > well.  I will fix this in the next version.
> 
> Fixed this in the attached patch.

Thanks for updating the patch.

I have few comments about the patch.

1.

1.1.
-   /* For streamed transactions notify the remote node about the abort. */
-   if (rbtxn_is_streamed(txn))
-   rb->stream_abort(rb, txn, lsn);
+   /* the transaction which is being skipped shouldn't have been streamed 
*/
+   Assert(!rbtxn_is_streamed(txn));

1.2
-   rbtxn_is_serialized(txn))
+   rbtxn_is_serialized(txn) &&
+   rbtxn_has_streamable_change(txn))
ReorderBufferStreamTXN(rb, toptxn);

In the above two places, I think we should do the check for the top-level
transaction(e.g. toptxn) because the patch only set flag for the top-level
transaction.

2.

+   /*
+* If there are any streamable changes getting queued then get the top
+* transaction and mark it has streamable change.  This is required for
+* streaming in-progress transactions, the in-progress transaction will
+* not be selected for streaming unless it has at least one streamable
+* change.
+*/
+   if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+   change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+   change->action == REORDER_BUFFER_CHANGE_DELETE ||
+   change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+   change->action == REORDER_BUFFER_CHANGE_TRUNCATE)

I think that a transaction that contains REORDER_BUFFER_CHANGE_MESSAGE can also 
be
considered as streamable. Is there a reason that we don't check it here ?

Best regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-28 Thread houzj.f...@fujitsu.com
On Mon, November 28, 2022 15:19 PM Peter Smith  wrote:
> Here are some review comments for patch v51-0002

Thanks for your comments!

> ==
> 
> 1.
> 
> GENERAL - terminology:  spool/serialize and data/changes/message
> 
> The terminology seems to be used at random. IMO it might be worthwhile 
> rechecking at least that terms are used consistently in all the 
> comments. e.g "serialize message data to disk" ... and later ...
> "apply the spooled messages".
> 
> Also for places where it says "Write the message to file" maybe 
> consider using consistent terminology like "serialize the message to a 
> file".
> 
> Also, try to standardize the way things are described by using 
> consistent (if they really are the same) terminology for "writing 
> data" VS "writing data" VS "writing messages" etc. It is confusing 
> trying to know if the different wording has some intended meaning or 
> is it just random.

I changes some of them, but I think there some things left which I will recheck 
in next version.
And I think we'd better not change comments that refer to existing comments or 
functions or variables.
For example, it’s fine for comments that refer to apply_spooled_message to use 
"spool" "message".


> ==
> 
> Commit message
> 
> 2.
> When the leader apply worker times out while sending a message to the 
> parallel apply worker. Instead of erroring out, switch to partial 
> serialize mode and let the leader serialize all remaining changes to 
> the file and notify the parallel apply workers to read and apply them at the 
> end of the transaction.
> 
> ~
> 
> The first sentence seems incomplete
> 
> SUGGESTION.
> In patch 0001 if the leader apply worker times out while attempting to 
> send a message to the parallel apply worker it results in an ERROR.
> 
> This patch (0002) modifies that behaviour, so instead of erroring it 
> will switch to "partial serialize" mode -  in this mode the leader 
> serializes all remaining changes to a file and notifies the parallel 
> apply workers to read and apply them at the end of the transaction.
> 
> ~~~
> 
> 3.
> 
> This patch 0002 is called “Serialize partial changes to disk if the 
> shm_mq buffer is full”, but the commit message is saying nothing about 
> the buffer filling up. I think the Commit message should be mentioning 
> something that makes the commit patch name more relevant. Otherwise 
> change the patch name.

Changed.

> ==
> 
> .../replication/logical/applyparallelworker.c
> 
> 4. File header comment
> 
> + * timeout is exceeded, the LA will write to file and indicate PA-2 
> + that it
> + * needs to read file for remaining messages. Then LA will start 
> + waiting for
> + * commit which will detect deadlock if any. (See pa_send_data() and 
> + typedef
> + * enum TransApplyAction)
> 
> "needs to read file for remaining messages" -> "needs to read that 
> file for the remaining messages"

Changed.

> ~~~
> 
> 5. pa_free_worker
> 
> + /*
> + * Stop the worker if there are enough workers in the pool.
> + *
> + * XXX we also need to stop the worker if the leader apply worker
> + * serialized part of the transaction data to a file due to send timeout.
> + * This is because the message could be partially written to the 
> + queue due
> + * to send timeout and there is no way to clean the queue other than
> + * resending the message until it succeeds. To avoid complexity, we
> + * directly stop the worker in this case.
> + */
> + if (winfo->serialize_changes ||
> + napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> 
> 5a.
> 
> + * XXX we also need to stop the worker if the leader apply worker
> + * serialized part of the transaction data to a file due to send timeout.
> 
> SUGGESTION
> XXX The worker is also stopped if the leader apply worker needed to 
> serialize part of the transaction data due to a send timeout.
> 
> ~
> 
> 5b.
> 
> + /* Unlink the files with serialized changes. */ if
> + (winfo->serialize_changes)
> + stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
> 
> A better comment might be
> 
> SUGGESTION
> Unlink any files that were needed to serialize partial changes.

Changed.

> ~~~
> 
> 6. pa_spooled_messages
> 
> /*
>  * Replay the spooled messages in the parallel apply worker if leader 
> apply
>  * worker has finished serializing changes to the file.
>  */
> static void
> pa_spooled_messages(void)
> 
> 6a.
> IMO a better name for this function would be 
> pa_apply_spooled_messages();

Not sure about this.

> ~
> 
> 6b.
> "if leader apply" -> "if the leader apply"

Changed.

> ~
> 
> 7.
> 
> + /*
> + * Acquire the stream lock if the leader apply worker is serializing
> + * changes to the file, because the parallel apply worker will no 
> + longer
> + * have a chance to receive a STREAM_STOP and acquire the lock until 
> + the
> + * leader serialize all changes to the file.
> + */
> + if (fileset_state == LEADER_FILESET_BUSY) { 
> + pa_lock_stream(MyParallelShared->xid, AccessShareLock); 
> + 

RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-26 Thread houzj.f...@fujitsu.com
On Tuesday, November 22, 2022 9:53 PM Kuroda, Hayato 
 wroteL
> 
> Thanks for updating the patch!
> I tested the case whether the deadlock caused by foreign key constraint could
> be detected, and it worked well.
> 
> Followings are my review comments. They are basically related with 0001, but
> some contents may be not. It takes time to understand 0002 correctly...

Thanks for the comments!

> 01. typedefs.list
> 
> LeaderFileSetState should be added to typedefs.list.
> 
> 
> 02. 032_streaming_parallel_apply.pl
> 
> As I said in [1]: the test name may be not matched. Do you have reasons to
> revert the change?

The original parallel safety check has been removed, so I changed the name.
After rethinking about this, I named it to stream_parallel_conflict.

> 
> 03. 032_streaming_parallel_apply.pl
> 
> The test does not cover the case that the backend process relates with the
> deadlock. IIUC this is another motivation to use a stream/transaction lock.
> I think it should be added.

The main deadlock cases that stream/transaction lock can detect is 1) LA->PA 2)
LA->PA->PA as explained atop applyparallelworker.c. So I think backend process
related one is a variant which I think have been covered by the existing
tests in the patch.

> 04. log output
> 
> While being applied spooled changes by PA, there are so many messages like
> "replayed %d changes from file..." and "applied %u changes...". They comes
> from
> apply_handle_stream_stop() and apply_spooled_messages(). They have same
> meaning, so I think one of them can be removed.

Changed.

> 05. system_views.sql
> 
> In the previous version you modified pg_stat_subscription system view. Why
> do you revert that?

I was not sure should we include that in the main patch set.
I added a top-up patch that change the view.

> 06. interrupt.c - SignalHandlerForShutdownRequest()
> 
> In the comment atop SignalHandlerForShutdownRequest(), some processes
> that assign the function except SIGTERM are clarified. We may be able to add
> the parallel apply worker.

Changed


> 08. guc_tables.c - ConfigureNamesInt
> 
> ```
> _sync_workers_per_subscription,
> +   2, 0, MAX_PARALLEL_WORKER_LIMIT,
> +   NULL, NULL, NULL
> +   },
> ```
> 
> The upper limit for max_sync_workers_per_subscription seems to be wrong, it
> should be used for max_parallel_apply_workers_per_subscription.

That's my miss, sorry for that.

> 10. worker.c - maybe_reread_subscription()
> 
> 
> ```
> +   if (am_parallel_apply_worker())
> +   ereport(LOG,
> +   /* translator: first %s is the name of logical 
> replication
> worker */
> +   (errmsg("%s for subscription \"%s\"
> will stop because of a parameter change",
> +
> + get_worker_name(), MySubscription->name)));
> ```
> 
> I was not sure get_worker_name() is needed. I think "logical replication apply
> worker"
> should be embedded.

Changed.

> 
> 11. worker.c - ApplyWorkerMain()
> 
> ```
> +   (errmsg_internal("%s for subscription \"%s\"
> two_phase is %s",
> +
> + get_worker_name(),
> ```

Changed


Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-26 Thread houzj.f...@fujitsu.com
On Friday, November 25, 2022 10:54 AM Peter Smith  wrote:
> 
> Here are some review comments for v51-0001.

Thanks for the comments!
> ==
> 
> .../replication/logical/applyparallelworker.c
> 
> 1. General - Error messages, get_worker_name()
> 
> I previously wrote a comment to ask if the get_worker_name() should be used
> in more places but the reply [1, #2b] was:
> 
> > 2b.
> > Consider if maybe all of these ought to be calling get_worker_name()
> > which is currently static in worker.c. Doing this means any future
> > changes to get_worker_name won't cause more inconsistencies.
> 
> The most error message in applyparallelxx.c can only use "xx parallel worker",
> so I think it's fine not to call get_worker_name
> 
> ~
> 
> I thought the reply missed the point I was trying to make -- I meant if it was
> arranged now so *every* message would go via
> get_worker_name() then in future somebody wanted to change the names (e.g.
> from "logical replication parallel apply worker" to "LR PA
> worker") then it would only need to be changed in one central place instead of
> hunting down every hardwired error message.
> 

Thanks for the suggestion. I understand your point, but I feel that using
get_worker_name() at some places where the worker type is decided could make
developer think that all kind of worker can enter this code which I am not sure
is better. So I didn't change this.

> 
> 2. HandleParallelApplyMessage
> 
> + case 'X': /* Terminate, indicating clean exit. */
> + shm_mq_detach(winfo->error_mq_handle);
> + winfo->error_mq_handle = NULL;
> + break;
> + default:
> + elog(ERROR, "unrecognized message type received from logical
> replication parallel apply worker: %c (message length %d bytes)",
> + msgtype, msg->len);
> 
> The case 'X' code indentation is too much.

Changed.

> ==
> 
> src/backend/replication/logical/origin.c
> 
> 3. replorigin_session_setup(RepOriginId node, int acquired_by)
> 
> @@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
>   * array doesn't have to be searched when calling
>   * replorigin_session_advance().
>   *
> - * Obviously only one such cached origin can exist per process and the 
> current
> + * Normally only one such cached origin can exist per process and the
> + current
>   * cached value can only be set again after the previous value is torn down
>   * with replorigin_session_reset().
> + *
> + * However, we do allow multiple processes to point to the same origin
> + slot if
> + * requested by the caller by passing PID of the process that has
> + already
> + * acquired it as acquired_by. This is to allow multiple parallel apply
> + * processes to use the same origin, provided they maintain commit
> + order, for
> + * example, by allowing only one process to commit at a time. For the
> + first
> + * process requesting this origin, the acquired_by parameter needs to
> + be set to
> + * 0.
>   */
>  void
> -replorigin_session_setup(RepOriginId node)
> +replorigin_session_setup(RepOriginId node, int acquired_by)
> 
> I think the meaning of the acquired_by=0 is not fully described here:
> "For the first process requesting this origin, the acquired_by parameter needs
> to be set to 0."
> IMO that seems to be describing it only from POV that you are always going to
> want to allow multiple processes. But really this is an optional feature so 
> you
> might pass acquired_by=0, not just because this is the first of multiple, but 
> also
> because you *never* want to allow multiple at all. The comment does not
> convey this meaning.
> 
> Maybe something worded like below is better?
> 
> SUGGESTION
> Normally only one such cached origin can exist per process so the cached value
> can only be set again after the previous value is torn down with
> replorigin_session_reset(). For this normal case pass
> acquired_by=0 (meaning the slot is not allowed to be already acquired by
> another process).
> 
> However, sometimes multiple processes can safely re-use the same origin slot
> (for example, multiple parallel apply processes can safely use the same 
> origin,
> provided they maintain commit order by allowing only one process to commit
> at a time). For this case the first process must pass acquired_by=0, and then 
> the
> other processes sharing that same origin can pass acquired_by=PID of the first
> process.

Changes as suggested.

> ==
> 
> src/backend/replication/logical/worker.c
> 
> 4. GENERAL - get_worker_name()
> 
> If you decide it is OK to hardwire some error messages instead of
> unconditionally calling the get_worker_name() -- see my #1 review comment in
> this post -- then there are some other messages in this file that also seem 
> like
> they can be also hardwired because the type of worker is already known.
> 
> Here are some examples:
> 
> 4a.
> 
> + else if (am_parallel_apply_worker())
> + {
> + if (rel->state != SUBREL_STATE_READY)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + /* translator: 

Avoid distributing new catalog snapshot again for the transaction being decoded.

2022-11-25 Thread houzj.f...@fujitsu.com
Hi,

When doing some other related work, I noticed that when decoding the COMMIT
record via SnapBuildCommitTxn()-> SnapBuildDistributeNewCatalogSnapshot() we
will add a new snapshot to all transactions including the one being 
decoded(just committed one).

But since we've already done required modifications in the snapshot for the
current transaction being decoded(in SnapBuildCommitTxn()), so I think we can
avoid adding a snapshot for it again.

Here is the patch to improve this.
Thoughts ?

Best regards,
Hou zhijie



0001-Avoid-distributing-new-catalogsnapshot-for-the-trans.patch
Description:  0001-Avoid-distributing-new-catalogsnapshot-for-the-trans.patch


RE: wake up logical workers after ALTER SUBSCRIPTION

2022-11-21 Thread houzj.f...@fujitsu.com
On Tuesday, November 22, 2022 2:49 PM Hayato Kuroda (Fujitsu) 

> 
> Dear Nathan,
> 
> > I think you are correct.  I did it this way in v2.  I've also moved
> > the bulk of the logic to logical/worker.c.
> 
> Thanks for updating! It becomes better. Further comments:
> 
> 01. AlterSubscription()
> 
> ```
> + LogicalRepWorkersWakeupAtCommit(subid);
> +
> ```
> 
> Currently subids will be recorded even if the subscription is not modified.
> I think LogicalRepWorkersWakeupAtCommit() should be called inside the if
> (update_tuple).

I think an exception would be REFRESH PULLICATION in which case update_tuple is
false, but it seems better to wake up apply worker in this case as well,
because the apply worker is also responsible to start table sync workers for
newly subscribed tables(in process_syncing_tables()).

Besides, it seems not a must to wake up apply worker for ALTER SKIP TRANSACTION,
Although there might be no harm for waking up in this case.

> 
> 02. LogicalRepWorkersWakeupAtCommit()
> 
> ```
> + oldcxt = MemoryContextSwitchTo(TopTransactionContext);
> + on_commit_wakeup_workers_subids =
> lappend_oid(on_commit_wakeup_workers_subids,
> +
> subid);
> ```
> 
> If the subscription is altered twice in the same transaction, the same subid 
> will
> be recorded twice.
> I'm not sure whether it may be caused some issued, but list_member_oid() can
> be used to avoid that.

+1, list_append_unique_oid might be better.

Best regards,
Hou zj




RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-21 Thread houzj.f...@fujitsu.com
On  Monday, November 21, 2022 2:26 PM Peter Smith  wrote:
> On Fri, Nov 18, 2022 at 6:03 PM Peter Smith 
> wrote:
> >
> > Here are some review comments for v47-0001
> >
> > (This review is a WIP - I will post more comments for this patch next
> > week)
> >
> 
> Here are the rest of my comments for v47-0001

Thanks for the comments!

> ==
> 
> doc/src/sgml/monitoring.
> 
> 1.
> 
> @@ -1851,6 +1851,11 @@ postgres   27093  0.0  0.0  30096  2752 ?
>  Ss   11:34   0:00 postgres: ser
>Waiting to acquire an advisory user lock.
>   
>   
> +  applytransaction
> +  Waiting to acquire acquire a lock on a remote transaction being
> +  applied on the subscriber side.
> + 
> + 
> 
> 1a.
> Typo "acquire acquire"

Fixed.

> ~
> 
> 1b.
> Maybe "on the subscriber side" does not mean much without any context.
> Maybe better to word it as below.
> 
> SUGGESTION
> Waiting to acquire a lock on a remote transaction being applied by a logical
> replication subscriber.

Changed.

> ==
> 
> doc/src/sgml/system-views.sgml
> 
> 2.
> 
> @@ -1361,8 +1361,9 @@
> virtualxid,
> spectoken,
> object,
> -   userlock, or
> -   advisory.
> +   userlock,
> +   advisory or
> +   applytransaction.
> 
> This change removed the Oxford comma that was there before. I assume it was
> unintended.

Changed.

> ==
> 
> .../replication/logical/applyparallelworker.c
> 
> 3. globals
> 
> The parallel_apply_XXX functions were all shortened to pa_XXX.
> 
> I wondered if the same simplification should be done also to the global
> statics...
> 
> e.g.
> ParallelApplyWorkersHash -> PAWorkerHash ParallelApplyWorkersList ->
> PAWorkerList ParallelApplyMessagePending -> PAMessagePending etc...

I personally feel these names looks fine to me.

> ~~~
> 
> 4. pa_get_free_worker
> 
> + foreach(lc, active_workers)
> + {
> + ParallelApplyWorkerInfo *winfo = NULL;
> +
> + winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
> 
> No need to assign NULL because the next line just overwrites that anyhow.

Changed.

> ~
> 
> 5.
> 
> + /*
> + * Try to free the worker first, because we don't wait for the rollback
> + * command to finish so the worker may not be freed at the end of the
> + * transaction.
> + */
> + if (pa_free_worker(winfo, winfo->shared->xid)) continue;
> +
> + if (!winfo->in_use)
> + return winfo;
> 
> Shouldn't the (!winfo->in_use) check be done first as well -- e.g. why are we
> trying to free a worker which is maybe not even in_use?
> 
> SUGGESTION (this will need some comment to explain what it is doing) if
> (!winfo->in_use || !pa_free_worker(winfo, winfo->shared->xid) &&
> !winfo->in_use)
> return winfo;

Since the pa_free_worker will check the in_use flag as well and
the current style looks clean to me. So I didn't change this.

But it seems we need to first call pa_free_worker for every worker and then
choose a free a free, otherwise a stopped worker info(shared memory or ...)
might be left for a long time. I will think about this and try to fix it in
next version.

> ~~~
> 
> 6. pa_free_worker
> 
> +/*
> + * Remove the parallel apply worker entry from the hash table. Stop the
> +work if
> + * there are enough workers in the pool.
> + *
> 
> Typo? "work" -> "worker"
> 

Fixed.

> 
> 7.
> 
> + /* Are there enough workers in the pool? */ if (napplyworkers >
> + (max_parallel_apply_workers_per_subscription / 2)) {
> 
> IMO that comment should be something more like "Don't detach/stop the
> worker unless..."
> 

Improved.

> 
> 8. pa_send_data
> 
> + /*
> + * Retry after 1s to reduce the cost of getting the system time and
> + * calculating the time difference.
> + */
> + (void) WaitLatch(MyLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L,
> + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
> 
> 8a.
> I am not sure you need to explain the reason in the comment. Just saying "Wait
> before retrying." seems sufficient to me.

Changed.

> ~
> 
> 8b.
> Instead of the hardwired "1s" in the comment, and 1000L in the code, maybe
> better to just have another constant.
> 
> SUGGESTION
> #define SHM_SEND_RETRY_INTERVAL_MS 1000
> #define SHM_SEND_TIMEOUT_MS 1

Changed.

> ~
> 
> 9.
> 
> + if (startTime == 0)
> + startTime = GetCurrentTimestamp();
> + else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
> 
> IMO the initial startTime should be at top of the function otherwise the 
> timeout
> calculation seems wrong.

Setting startTime at beginning will bring unnecessary cost if we don't need to 
retry.
And start counting from the first failure looks fine to me.

> ==
> 
> src/backend/replication/logical/worker.c
> 
> 10. handle_streamed_transaction
> 
> + * In streaming case (receiving a block of streamed transaction), for
> + * SUBSTREAM_ON mode, simply redirect it to a file for the proper
> + toplevel
> + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to
> + parallel
> + * apply workers (LOGICAL_REP_MSG_RELATION or 

RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-21 Thread houzj.f...@fujitsu.com
On Friday, November 18, 2022 8:36 AM Masahiko Sawada  
wrote:
> 
> Here are review comments on v47-0001 and v47-0002 patches:

Thanks for the comments!

> When the parallel apply worker exited, I got the following server log.
> I think this log is not appropriate since the worker was not terminated by
> administrator command but exited by itself. Also, probably it should exit with
> exit code 0?
> 
> FATAL:  terminating logical replication worker due to administrator command
> LOG:  background worker "logical replication parallel worker" (PID
> 3594918) exited with exit code 1

Changed to report a LOG and exited with code 0.

> ---
> /*
>  * Stop the worker if there are enough workers in the pool or the leader
>  * apply worker serialized part of the transaction data to a file due to
>  * send timeout.
>  */
> if (winfo->serialize_changes ||
> napplyworkers > (max_parallel_apply_workers_per_subscription / 2))
> 
> Why do we need to stop the worker if the leader serializes changes?

Because there might be partial sent message left in memory queue if send 
timeout.
And we need to either re-send the same message until success or detach from the 
memory
queue. To make the logic simple, the patch directly stop the worker in this 
case.


> ---
> +/*
> + * Release all session level locks that could be held in parallel 
> apply
> + * mode.
> + */
> +LockReleaseAll(DEFAULT_LOCKMETHOD, true);
> +
> 
> I think we call LockReleaseAll() at the process exit (in ProcKill()), but do 
> we
> really need to do LockReleaseAll() here too?

If we don't release locks before ProcKill, we might break an Assert check at
the beginning of ProcKill which is used to ensure all the locks are released.
And It seems ProcKill doesn't release session level locks after the assert
check. So I think we'd better release them here.

> ---
> 
> +elog(ERROR, "could not find replication state slot
> for replication"
> + "origin with OID %u which was acquired by
> %d", node, acquired_by);
> 
> Let's not break the error log message in the middle so that the user can 
> search
> the message by grep easily.

Changed.

> ---
> +{
> +{"max_parallel_apply_workers_per_subscription",
> +PGC_SIGHUP,
> +REPLICATION_SUBSCRIBERS,
> +gettext_noop("Maximum number of parallel
> apply workers per subscription."),
> +NULL,
> +},
> +_parallel_apply_workers_per_subscription,
> +2, 0, MAX_BACKENDS,
> +NULL, NULL, NULL
> +},
> +
> 
> I think we should use MAX_PARALLEL_WORKER_LIMIT as the max value instead.
> MAX_BACKENDS is too high.

Changed.

> ---
> +/*
> + * Indicates whether there are pending messages in the queue.
> The parallel
> + * apply worker will check it before starting to wait.
> + */
> +pg_atomic_uint32   pending_message_count;
> 
> The "pending messages" sounds like individual logical replication messages
> such as LOGICAL_REP_MSG_INSERT. But IIUC what this value actually shows is
> how many streamed chunks are pending to process, right?

Yes, renamed this.

> ---
> When the parallel apply worker raises an error, I got the same error twice 
> from
> the leader worker and parallel worker as follows. Can we suppress either one?
> 
> 2022-11-17 17:30:23.490 JST [3814552] LOG:  logical replication parallel apply
> worker for subscription "test_sub1" has started
> 2022-11-17 17:30:23.490 JST [3814552] ERROR:  duplicate key value violates
> unique constraint "test1_c_idx"
> 2022-11-17 17:30:23.490 JST [3814552] DETAIL:  Key (c)=(1) already exists.
> 2022-11-17 17:30:23.490 JST [3814552] CONTEXT:  processing remote data for
> replication origin "pg_16390" during message type "INSERT" for replication
> target relatio n "public.test1" in transaction 731
> 2022-11-17 17:30:23.490 JST [3814550] ERROR:  duplicate key value violates
> unique constraint "test1_c_idx"
> 2022-11-17 17:30:23.490 JST [3814550] DETAIL:  Key (c)=(1) already exists.
> 2022-11-17 17:30:23.490 JST [3814550] CONTEXT:  processing remote data for
> replication origin "pg_16390" during message type "INSERT" for replication
> target relatio n "public.test1" in transaction 731
> parallel apply worker

It seems similar to the behavior of parallel query which will report the same
error twice. But I agree it might be better for the leader to report something
different. So, I changed the error message reported by leader in the new
version patch.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-10 Thread houzj.f...@fujitsu.com
On Monday, November 7, 2022 6:18 PM Masahiko Sawada  
wrote:
> 
> On Thu, Nov 3, 2022 at 10:06 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Wednesday, November 2, 2022 10:50 AM Masahiko Sawada
>  wrote:
> > >
> > > On Mon, Oct 24, 2022 at 8:42 PM Masahiko Sawada
> > >  wrote:
> > > >
> > > > On Wed, Oct 12, 2022 at 3:04 PM Amit Kapila
> 
> > > wrote:
> > > > >
> > > > > On Tue, Oct 11, 2022 at 5:52 AM Masahiko Sawada
> > >  wrote:
> > > > > >
> > > > > > On Fri, Oct 7, 2022 at 2:00 PM Amit Kapila
> 
> > > wrote:
> > > > > > >
> > > > > > > About your point that having different partition structures for
> > > > > > > publisher and subscriber, I don't know how common it will be once
> we
> > > > > > > have DDL replication. Also, the default value of
> > > > > > > publish_via_partition_root is false which doesn't seem to indicate
> > > > > > > that this is a quite common case.
> > > > > >
> > > > > > So how can we consider these concurrent issues that could happen
> only
> > > > > > when streaming = 'parallel'? Can we restrict some use cases to avoid
> > > > > > the problem or can we have a safeguard against these conflicts?
> > > > > >
> > > > >
> > > > > Yeah, right now the strategy is to disallow parallel apply for such
> > > > > cases as you can see in *0003* patch.
> > > >
> > > > Tightening the restrictions could work in some cases but there might
> > > > still be coner cases and it could reduce the usability. I'm not really
> > > > sure that we can ensure such a deadlock won't happen with the current
> > > > restrictions. I think we need something safeguard just in case. For
> > > > example, if the leader apply worker is waiting for a lock acquired by
> > > > its parallel worker, it cancels the parallel worker's transaction,
> > > > commits its transaction, and restarts logical replication. Or the
> > > > leader can log the deadlock to let the user know.
> > > >
> > >
> > > As another direction, we could make the parallel apply feature robust
> > > if we can detect deadlocks that happen among the leader worker and
> > > parallel workers. I'd like to summarize the idea discussed off-list
> > > (with Amit, Hou-San, and Kuroda-San) for discussion. The basic idea is
> > > that when the leader worker or parallel worker needs to wait for
> > > something (eg. transaction completion, messages) we use lmgr
> > > functionality so that we can create wait-for edges and detect
> > > deadlocks in lmgr.
> > >
> > > For example, a scenario where a deadlock occurs is the following:
> > >
> > > [Publisher]
> > > create table tab1(a int);
> > > create publication pub for table tab1;
> > >
> > > [Subcriber]
> > > creat table tab1(a int primary key);
> > > create subscription sub connection 'port=1 dbname=postgres'
> > > publication pub with (streaming = parallel);
> > >
> > > TX1:
> > > BEGIN;
> > > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); -- streamed
> > > Tx2:
> > > BEGIN;
> > > INSERT INTO tab1 SELECT i FROM generate_series(1, 5000) s(i); --
> streamed
> > > COMMIT;
> > > COMMIT;
> > >
> > > Suppose a parallel apply worker (PA-1) is executing TX-1 and the
> > > leader apply worker (LA) is executing TX-2 concurrently on the
> > > subscriber. Now, LA is waiting for PA-1 because of the unique key of
> > > tab1 while PA-1 is waiting for LA to send further messages. There is a
> > > deadlock between PA-1 and LA but lmgr cannot detect it.
> > >
> > > One idea to resolve this issue is that we have LA acquire a session
> > > lock on a shared object (by LockSharedObjectForSession()) and have
> > > PA-1 wait on the lock before trying to receive messages. IOW,  LA
> > > acquires the lock before sending STREAM_STOP and releases it if
> > > already acquired before sending STREAM_START, STREAM_PREPARE and
> > > STREAM_COMMIT. For PA-1, it always needs to acquire the lock after
> > > processing STREAM_STOP and then release immediately after acquiring
> > > it. That way, when PA-1 is waiting for LA, we can have a wait-edge
> > > from PA-1 

RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-10 Thread houzj.f...@fujitsu.com
On Tuesday, November 8, 2022 7:50 PM Amit Kapila  
wrote:
> On Mon, Nov 7, 2022 at 6:49 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Friday, November 4, 2022 7:45 PM Amit Kapila
>  wrote:
> > > 3.
> > > apply_handle_stream_start(StringInfo s) { ...
> > > + if (!first_segment)
> > > + {
> > > + /*
> > > + * Unlock the shared object lock so that parallel apply worker
> > > + * can continue to receive and apply changes.
> > > + */
> > > + parallel_apply_unlock(winfo->shared->stream_lock_id);
> > > ...
> > > }
> > >
> > > Can we have an assert before this unlock call that the lock must be
> > > held? Similarly, if there are other places then we can have assert
> > > there as well.
> >
> > It seems we don't have a standard API can be used without a transaction.
> > Maybe we can use the list ParallelApplyLockids to check that ?
> >
> 
> Yeah, that occurred to me as well but I am not sure if it is a good
> idea to maintain this list just for assertion but if it turns out that
> we need to maintain it for a different purpose then we can probably
> use it for assert as well.
> 
> Few other comments/questions:
> =
> 1.
> apply_handle_stream_start(StringInfo s)
> {
> ...
> 
> + case TRANS_PARALLEL_APPLY:
> ...
> ...
> + /*
> + * Unlock the shared object lock so that the leader apply worker
> + * can continue to send changes.
> + */
> + parallel_apply_unlock(MyParallelShared->stream_lock_id,
> AccessShareLock);
> 
> As per the design in the email [1], this lock needs to be released by
> the leader worker during stream start which means it should be
> released under the state TRANS_LEADER_SEND_TO_PARALLEL. From the
> comments as well, it is not clear to me why at this time leader is
> supposed to be blocked. Is there a reason for doing differently than
> what is proposed in the original design?
> 2. Similar to above, it is not clear why the parallel worker needs to
> release the stream_lock_id lock at stream_commit and stream_prepare?

Sorry, these were due to my miss. Changed.

> 3. Am, I understanding correctly that you need to lock/unlock in
> apply_handle_stream_abort() for the parallel worker because after
> rollback to savepoint, there could be another set of stream or
> transaction end commands for which you want to wait? If so, maybe an
> additional comment would serve the purpose.

I think you are right. I will think about this in case I missed something and
add some comments in next version.

> 4.
> The leader may have sent multiple streaming blocks in the queue
> + * When the child is processing a streaming block. So only try to
> + * lock if there is no message left in the queue.
> 
> Let's slightly reword this to: "By the time child is processing the
> changes in the current streaming block, the leader may have sent
> multiple streaming blocks. So, try to lock only if there is no message
> left in the queue."

Changed.

> 5.
> +parallel_apply_unlock(uint16 lockid, LOCKMODE lockmode)
> +{
> + if (!list_member_int(ParallelApplyLockids, lockid))
> + return;
> +
> + UnlockSharedObjectForSession(SubscriptionRelationId,
> MySubscription->oid,
> + lockid, am_leader_apply_worker() ?
> + AccessExclusiveLock:
> + AccessShareLock);
> 
> This function should use lockmode argument passed rather than deciding
> based on am_leader_apply_worker. I think this is anyway going to
> change if we start using a different locktag as discussed in one of
> the above emails.

Changed.

> 6.
> +
>  /*
>   * Common spoolfile processing.
>   */
> -static void
> -apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
> +void
> +apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
> 
> Seems like a spurious line addition.

Removed.

> Fair point. I think if the user wants, she can join with pg_stat_subscription
> based on PID and find the corresponding subscription. However, if we want to
> identify everything via pg_locks then I think we should also mention classid
> or database id as field1. So, it would look like: field1: (pg_subscription's
> oid or current db id); field2: OID of subscription in pg_subscription;
> field3: local or remote xid; field4: 0/1 to differentiate between remote and
> local xid.

I tried to use local xid to lock the transaction, but we currently can only get
the local xid after applying the first change. And it's possible that the first
change in parallel apply worker is blocked by other parallel apply worker which
means the parallel apply worker might not have a chance to share the local xid
with the leader.

To resolve this, I tried

RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-10 Thread houzj.f...@fujitsu.com
On Monday, November 7, 2022 4:17 PM Peter Smith 
> 
> Here are my review comments for v42-0001

Thanks for the comments.
> ==
> 
> 28. handle_streamed_transaction
> 
>  static bool
>  handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)  {
> - TransactionId xid;
> + TransactionId current_xid;
> + ParallelApplyWorkerInfo *winfo;
> + TransApplyAction apply_action;
> + StringInfoData origin_msg;
> +
> + apply_action = get_transaction_apply_action(stream_xid, );
> 
>   /* not in streaming mode */
> - if (!in_streamed_transaction)
> + if (apply_action == TRANS_LEADER_APPLY)
>   return false;
> 
> - Assert(stream_fd != NULL);
>   Assert(TransactionIdIsValid(stream_xid));
> 
> + origin_msg = *s;
> 
> ~
> 
> 28b.
> Why not assign it at the declaration, the same as apply_handle_stream_prepare
> does?

The assignment is unnecessary for non-streaming transaction, so I delayed it.
> ~
> 
> 44b.
> If this is always written to a file, then wouldn't a better function name be
> something including the word "serialize" - e.g.
> serialize_message()?

I feel it would be better to be consistent with the existing style 
stream_xxx_xx().

I think I have addressed all the comments, but since quite a few logics are
changed in the new version so I might missed something. And dome code wrapping 
need to
be adjusted, I plan to run pg_indent for next version.

Best regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-10 Thread houzj.f...@fujitsu.com
On Monday, November 7, 2022 7:43 PM Kuroda, Hayato/黒田 隼人 
 wrote:
> 
> Dear Hou,
> 
> The followings are my comments. I want to consider the patch more, but I sent
> it once.

Thanks for the comments.

> 
> ===
> worker.c
> 
> 01. typedef enum TransApplyAction
> 
> ```
> /*
>  * What action to take for the transaction.
>  *
>  * TRANS_LEADER_APPLY means that we are in the leader apply worker and
> changes
>  * of the transaction are applied directly in the worker.
>  *
>  * TRANS_LEADER_SERIALIZE means that we are in the leader apply worker or
> table
>  * sync worker. Changes are written to temporary files and then applied when
>  * the final commit arrives.
>  *
>  * TRANS_LEADER_SEND_TO_PARALLEL means that we are in the leader apply
> worker
>  * and need to send the changes to the parallel apply worker.
>  *
>  * TRANS_PARALLEL_APPLY means that we are in the parallel apply worker and
>  * changes of the transaction are applied directly in the worker.
>  */
> ```
> 
> TRANS_LEADER_PARTIAL_SERIALIZE should be listed in.
> 

Added.

> 02. handle_streamed_transaction()
> 
> ```
> +   StringInfoData  origin_msg;
> ...
> +   origin_msg = *s;
> ...
> +   /* Write the change to the current file */
> +   stream_write_change(action,
> +
> apply_action == TRANS_LEADER_SERIALIZE ?
> +
> + s : _msg);
> ```
> 
> I'm not sure why origin_msg is needed. Can we remove the conditional
> operator?

Currently, the parallel apply worker would need the transaction xid of this 
change to
define savepoint. So, it need to write the original message to file.

> 
> 03. apply_handle_stream_start()
> 
> ```
> + * XXX We can avoid sending pairs of the START/STOP messages to the
> + parallel
> + * worker because unlike apply worker it will process only one
> + transaction at a
> + * time. However, it is not clear whether any optimization is
> + worthwhile
> + * because these messages are sent only when the
> + logical_decoding_work_mem
> + * threshold is exceeded.
> ```
> 
> This comment should be modified because PA must acquire and release locks at
> that time.
> 
> 
> 04. apply_handle_stream_prepare()
> 
> ```
> +   /*
> +* After sending the data to the parallel apply 
> worker,
> wait for
> +* that worker to finish. This is necessary to 
> maintain
> commit
> +* order which avoids failures due to transaction
> dependencies and
> +* deadlocks.
> +*/
> +
> + parallel_apply_wait_for_xact_finish(winfo->shared);
> ```
> 
> Here seems not to be correct. LA may not send data but spill changes to file.

Changed.

> 05. apply_handle_stream_commit()
> 
> ```
> +   if (apply_action ==
> TRANS_LEADER_PARTIAL_SERIALIZE)
> +
> + stream_cleanup_files(MyLogicalRepWorker->subid, xid);
> ```
> 
> I'm not sure whether the stream files should be removed by LA or PAs. Could
> you tell me the reason why you choose LA?

I think the logic would be natural that only LA can write/delete/create the 
file and
PA only need to read from it.

> ===
> applyparallelworker.c
> 
> 05. parallel_apply_can_start()
> 
> ```
> +   if (switching_to_serialize)
> +   return false;
> ```
> 
> Could you add a comment like:
> Don't start a new parallel apply worker if the leader apply worker has been
> spilling changes to the disk temporarily.

These codes have been removed.

> 06. parallel_apply_start_worker()
> 
> ```
> +   /*
> +* Set the xact_state flag in the leader instead of the
> +* parallel apply worker to avoid the race condition where the leader
> has
> +* already started waiting for the parallel apply worker to finish
> +* processing the transaction while the child process has not yet
> +* processed the first STREAM_START and has not set the
> +* xact_state to true.
> +*/
> ```
> 
> I thinkg the word "flag" should be used for boolean, so the comment should be
> modified.
> (There are so many such code-comments, all of them should be modified.)

Changed.

> 
> 07. parallel_apply_get_unique_id()
> 
> ```
> +/*
> + * Returns the unique id among all parallel apply workers in the subscriber.
> + */
> +static uint16
> +parallel_apply_get_unique_id()
> ```
> 
> I think this function is inefficient: the computational complexity will be 
> increased
> linearly when the number of PAs is increased. I think the Bitmapset data
> structure may be used.

This function is removed.

> 08. parallel_apply_send_data()
> 
> ```
> #define CHANGES_THRESHOLD 1000
> #define SHM_SEND_TIMEOUT_MS   1
> ```
> 
> I think the timeout may be too long. Could you tell me the background about 
> it?

Serializing data to file would affect the performance, so I tried to make it 
difficult to happen unless the
PA is really blocked by another PA or BA.

> 09. parallel_apply_send_data()
> 
> ```
>   

RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-06 Thread houzj.f...@fujitsu.com
On Saturday, November 5, 2022 1:43 PM Amit Kapila 
> 
> On Fri, Nov 4, 2022 at 7:35 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > On Friday, November 4, 2022 4:07 PM Amit Kapila
>  wrote:
> > >
> > > On Thu, Nov 3, 2022 at 6:36 PM houzj.f...@fujitsu.com
> > >  wrote:
> > > >
> > > > Thanks for the analysis and summary !
> > > >
> > > > I tried to implement the above idea and here is the patch set.
> > > >
> > >
> > > Few comments on v42-0001
> > > ===
> >
> > Thanks for the comments.
> >
> > >
> > > 10.
> > > + winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
> > > + winfo->shared->transaction_lock_id =
> > > + winfo->shared->parallel_apply_get_unique_id();
> > >
> > > Why can't we use xid (remote_xid) for one of these and local_xid
> > > (one generated by parallel apply) for the other?
> ...
> ...
> >
> > I also considered using xid for these locks, but it seems the objsubid
> > for the shared object lock is 16bit while xid is 32 bit. So, I tried
> > to generate a unique 16bit id here.
> >
> 
> Okay, I see your point. Can we think of having a new lock tag for this with 
> classid,
> objid, objsubid for the first three fields of locktag field? We can use a new
> macro SET_LOCKTAG_APPLY_TRANSACTION and a common function to set the
> tag and acquire the lock. One more point related to this is that I am 
> suggesting
> classid by referring to SET_LOCKTAG_OBJECT as that is used in the current
> patch but do you think we need it for our purpose, won't subscription id and
> xid can uniquely identify the tag?

I agree that it could be better to have a new lock tag. Another point is that
the remote xid and Local xid could be the same in some rare cases, so I think
we might need to add another identifier to make it unique.

Maybe :
locktag_field1 : subscription oid
locktag_field2 : xid(remote or local)
locktag_field3 : 0(lock for stream block)/1(lock for transaction)

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-11-04 Thread houzj.f...@fujitsu.com
On Friday, November 4, 2022 4:07 PM Amit Kapila  wrote:
> 
> On Thu, Nov 3, 2022 at 6:36 PM houzj.f...@fujitsu.com
>  wrote:
> >
> > Thanks for the analysis and summary !
> >
> > I tried to implement the above idea and here is the patch set.
> >
> 
> Few comments on v42-0001
> ===

Thanks for the comments.

> 
> 10.
> + winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
> + winfo->shared->transaction_lock_id = parallel_apply_get_unique_id();
> 
> Why can't we use xid (remote_xid) for one of these and local_xid (one 
> generated
> by parallel apply) for the other? I was a bit worried about the local_xid 
> because it
> will be generated only after applying the first message but the patch already
> seems to be waiting for it in parallel_apply_wait_for_xact_finish as seen in 
> the
> below code.
> 
> +void
> +parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared)
> +{
> + /*
> + * Wait until the parallel apply worker handles the first message and
> + * set the flag to true.
> + */
> + parallel_apply_wait_for_in_xact(wshared, PARALLEL_TRANS_STARTED);
> +
> + /* Wait for the transaction lock to be released. */
> + parallel_apply_lock(wshared->transaction_lock_id);

I also considered using xid for these locks, but it seems the objsubid for the
shared object lock is 16bit while xid is 32 bit. So, I tried to generate a 
unique 16bit id
here. I will think more on this and maybe I need to add some comments to
explain this.

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-21 Thread houzj.f...@fujitsu.com
On Wednesday, October 19, 2022 8:50 PM Kuroda, Hayato/黒田 隼人 
 wrote:
> 
> ===
> 01. applyparallelworker.c - SIZE_STATS_MESSAGE
> 
> ```
> /*
>  * There are three fields in each message received by the parallel apply
>  * worker: start_lsn, end_lsn and send_time. Because we have updated these
>  * statistics in the leader apply worker, we can ignore these fields in the
>  * parallel apply worker (see function LogicalRepApplyLoop).
>  */
> #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
> ```
> 
> According to other comment styles, it seems that the first sentence of the
> comment should represent the datatype and usage, not the detailed reason.
> For example, about ParallelApplyWorkersList, you said "A list ...". How about
> adding like following message:
> The message size that can be skipped by parallel apply worker

Thanks for the comments, but the current description seems enough to me.

> ~~~
> 02. applyparallelworker.c - parallel_apply_start_subtrans
> 
> ```
>   if (current_xid != top_xid &&
>   !list_member_xid(subxactlist, current_xid)) ```
> 
> A macro TransactionIdEquals is defined in access/transam.h. Should we use it,
> or is it too trivial?

I checked the existing codes, it seems both style are being used.
Maybe we can post a separate patch to replace them later.

> ~~~
> 08. worker.c - apply_handle_prepare_internal
> 
> Same as above.
> 
> 
> ~~~
> 09. worker.c - maybe_reread_subscription
> 
> ```
>   /*
>* Exit if any parameter that affects the remote connection was
> changed.
>* The launcher will start a new worker.
>*/
>   if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
>   strcmp(newsub->name, MySubscription->name) != 0 ||
>   strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
>   newsub->binary != MySubscription->binary ||
>   newsub->stream != MySubscription->stream ||
>   strcmp(newsub->origin, MySubscription->origin) != 0 ||
>   newsub->owner != MySubscription->owner ||
>   !equal(newsub->publications, MySubscription->publications))
>   {
>   ereport(LOG,
>   (errmsg("logical replication apply worker for
> subscription \"%s\" will restart because of a parameter change",
>   MySubscription->name)));
> 
>   proc_exit(0);
>   }
> ```
> 
> When the parallel apply worker has been launched and then the subscription
> option has been modified, the same message will appear twice.
> But if the option "streaming" is changed from "parallel" to "on", one of them
> will not restart again.
> Should we modify message?

Thanks, it seems a timing problem, if the leader catch the change first and stop
the parallel workers, the message will only appear once. But I agree we'd
better make the message clear. I changed the message in parallel apply worker.
While on it, I also adjusted some other message to include "parallel apply
worker" if they are in parallel apply worker.

Best regards,
Hou zj



RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-20 Thread houzj.f...@fujitsu.com
On Wednesday, October 19, 2022 8:50 PM Kuroda, Hayato/黒田 隼人 
 wrote:

Thanks for the comments.

> 03. applyparallelwprker.c - LogicalParallelApplyLoop
> 
> ```
>   case SHM_MQ_WOULD_BLOCK:
>   {
>   int rc;
> 
>   if (!in_streamed_transaction)
>   {
>   /*
>* If we didn't get any 
> transactions for a while there might be
>* unconsumed invalidation 
> messages in the queue, consume them
>* now.
>*/
>   AcceptInvalidationMessages();
>   maybe_reread_subscription();
>   }
> 
>   MemoryContextReset(ApplyMessageContext);
> ```
> 
> Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we 
> reach here.

I was concerned that some code in deeper level might allocate some memory as
there are lots of codes and functions could be invoked in the loop(For example,
the functions in ProcessInterrupts()). Although It might not matter in
practice, but I think it might be better to reset here to make it robust. 
Besides,
the code seems consistent with the logic in LogicalRepApplyLoop.

> 04. applyparallelwprker.c - HandleParallelApplyMessages
> 
> ```
>   else if (res == SHM_MQ_SUCCESS)
>   {
>   StringInfoData msg;
> 
>   initStringInfo();
>   appendBinaryStringInfo(, data, nbytes);
>   HandleParallelApplyMessage(winfo, );
>   pfree(msg.data);
>   }
> ```
> 
> In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used but
> initialized StringInfoData directly initialized. Why there is a difrerence? 
> The
> function will do repalloc() and memcpy(), so it may be inefficient.

I think both styles are fine, the code in HandleParallelApplyMessages is to keep
consistent with the similar function HandleParallelMessages() which is not a
performance sensitive function.


> 05. applyparallelwprker.c - parallel_apply_send_data
> 
> ```
>   if (result != SHM_MQ_SUCCESS)
>   ereport(ERROR,
>   
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>errmsg("could not send data to shared-memory 
> queue")));
> 
> ```
> 
> I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait
> = false) failed only when the opposite process has been exited. How about add 
> a
> hint or detailed message like "lost connection to parallel apply worker"?

Thanks for analyzing, but I am not sure if "lost connection to xx worker" is a
appropriate errhint or detail. The current error message looks clear to me.


> 07. worker.c - apply_handle_commit_internal
> 
> I think we can add an assertion like Assert(replorigin_session_origin_lsn !=
> InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId), to
> avoid missing replorigin_session_setup. Previously it was set at the entry
> point at never reset.

I feel addding the assert for replorigin_session_origin is fine here. For
replorigin_session_origin_lsn, I am not sure if looks better to check here as
we need to distingush the case for streaming=on and streaming=parallel if we
want to check that.


> 10. general
> 
> IIUC parallel apply workers could not detect the deadlock automatically,
> right? I thought we might be able to use the heartbeat protocol between a
> leader worker and parallel workers.
>  
> You have already implemented a mechanism to send and receive messages between
> workers. My idea is that each parallel apply worker records a timestamp that
> gets a message from the leader and if a certain time (30s?) has passed it
> sends a heartbeat message like 'H'. The leader consumes 'H' and sends a reply
> like LOGICAL_REP_MSG_KEEPALIVE in HandleParallelApplyMessage(). If the
> parallel apply worker does not receive any message for more than one minute,
> it regards that the deadlock has occurred and can change the retry flag to on
> and exit.
> 
> The above assumes that the leader cannot reply to the message while waiting
> for the lock. Moreover, it may have notable overhead and we must use a new
> logical replication message type.
> 
> How do you think? Have you already considered about it?

Thanks for the suggestion. But we are trying to detect this kind of problem 
before
this problematic case happens and disallow parallelism in these cases by
checking the unique/constr/partitioned... in 0003/0004 patch.

About the keepalive design. We could do that, but the leader 

RE: list of TransactionIds

2022-10-20 Thread houzj.f...@fujitsu.com
On Monday, July 4, 2022 9:27 PM Alvaro Herrera  wrote:

Hi,

> 
> Pushed now, to master only.

Thanks for introducing these APIs!

While trying to use the newly introduced list_member_xid(), I noticed that it
internally use lfirst_oid instead of lfirst_xid. It works ok for now. Just in
case we change xid to 64 bits in the future, I think we’d better use lfirst_xid
here.

Here is a tiny patch to fix that.

Best regards,
Hou Zhijie


0001-use-proper-macros-to-access-xid.patch
Description: 0001-use-proper-macros-to-access-xid.patch


RE: Improve errhint for ALTER SUBSCRIPTION ADD/DROP PUBLICATION

2022-10-18 Thread houzj.f...@fujitsu.com
On Tuesday, October 18, 2022 5:50 PM Alvaro Herrera  
wrote:
> 
> On 2022-Oct-18, Japin Li wrote:
> 
> >
> > On Tue, 18 Oct 2022 at 12:00, houzj.f...@fujitsu.com
>  wrote:
> >
> > > Agreed. Here is new version patch which changed the error code and
> > > moved the whole command out of the message according to Álvaro's
> comment.
> >
> > My bad!  The patch looks good to me.
> 
> Thank you, I pushed it to both branches, because I realized we were saying 
> "SET
> PUBLICATION" when we meant "ADD/DROP"; that hint could be quite
> damaging if anybody decides to actually follow it ISTM.

Thanks for pushing!

Best regards,
Hou zj


RE: CF Bot failure in wait_for_subscription_sync()

2022-10-18 Thread houzj.f...@fujitsu.com
On Tuesday, October 18, 2022 2:16 PM Bharath Rupireddy 
 wrote:
> 
> Hi,
> 
> I have seen 2 patches registered in CF failing on Linux - Debian Bullseye in
> wait_for_subscription_sync(). It seems like the tables aren't being synced. I
> have not done any further analysis. I'm not sure if this issue is being 
> discussed
> elsewhere.
> 
> # Postmaster PID for node "twoways" is 50208 Waiting for all subscriptions in
> "twoways" to synchronize data
> [14:12:43.092](198.391s) # poll_query_until timed out executing this query:
> # SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r',
> 's'); # expecting this output:
> # t
> # last actual query output:
> # f
> # with stderr:
> timed out waiting for subscriber to synchronize data at t/100_bugs.pl line 
> 147.
> 
> https://api.cirrus-ci.com/v1/artifact/task/6618623857917952/log/src/test/sub
> scription/tmp_check/log/regress_log_100_bugs
> https://cirrus-ci.com/task/6618623857917952
> https://cirrus-ci.com/task/5764058174455808

Thanks for reporting this. I am not sure about the root cause but just share
some initial analysis here.

This testcase waits for table sync to finish for both table "t" and table "t2".
But from the log, I can only see the log[1] related to the table sync of table
"t". So it seems that the table sync worker for table "t2" was never started
due to some reason. I tried it locally but have not reproduced this yet.

[1]---
2022-10-17 10:16:37.216 UTC [48051][logical replication worker] LOG:  logical 
replication table synchronization worker for subscription "testsub", table "t" 
has finished
---

Best regards,
Hou zj


RE: Perform streaming logical transactions by background workers and parallel apply

2022-10-17 Thread houzj.f...@fujitsu.com
On Tuesday, October 18, 2022 10:36 AM Peter Smith  wrote:
> 
> Hi, here are my review comments for patch v38-0001.

Thanks for the comments.

> ~~~
> 
> 12. get_transaction_apply_action
> 
> I still felt like there should be some tablesync checks/comments in
> this function, just for sanity, even if it works as-is now.
> 
> For example, are you saying ([3] #22b) that there might be rare cases
> where a Tablesync would call to parallel_apply_find_worker? That seems
> strange, given that "for streaming transactions that are being applied
> in the parallel ... we disallow applying changes on a table that is
> not in the READY state".
> 
> --

I think because we won't try to start parallel apply worker in table sync
worker(see the check in parallel_apply_can_start()), so we won't find any
worker in parallel_apply_find_worker() which means get_transaction_apply_action
will return TRANS_LEADER_SERIALIZE. And get_transaction_apply_action is a
function which can be invoked for all kinds of workers(same is true for all
apply_handle_xxx functions), so not sure if table sync check/comment is
necessary.

Best regards,
Hou zj


RE: Improve errhint for ALTER SUBSCRIPTION ADD/DROP PUBLICATION

2022-10-17 Thread houzj.f...@fujitsu.com
On Monday, October 17, 2022 6:14 PM Amit Kapila  wrote:
> 
> On Mon, Oct 17, 2022 at 2:41 PM Alvaro Herrera 
> wrote:
> >
> > On 2022-Oct-17, Peter Smith wrote:
> >
> > > On Mon, Oct 17, 2022 at 6:43 PM Alvaro Herrera
>  wrote:
> >
> > > > I'm not sure that ERRCODE_SYNTAX_ERROR is the right thing here;
> > > > sounds like ERRCODE_FEATURE_NOT_SUPPORTED might be more
> appropriate.
> > >
> > > I thought maybe ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE, which
> > > would make it the same as similar messages in the same function when
> > > incompatible parameters are specified.
> >
> > Hmm, yeah, I guess that's also a possibility.
> >
> 
> Right, ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE seems to suite better
> here.

Agreed. Here is new version patch which changed the error code and
moved the whole command out of the message according to Álvaro's comment.

Best regards,
Hou zj



v2-0001-Improve-errhint-for-ALTER-SUBSCRIPTION-ADD-DROP-PUBL.patch
Description:  v2-0001-Improve-errhint-for-ALTER-SUBSCRIPTION-ADD-DROP-PUBL.patch


Improve errhint for ALTER SUBSCRIPTION ADD/DROP PUBLICATION

2022-10-16 Thread houzj.f...@fujitsu.com
Hi,

While working on some logical replication related features.
I found the HINT message could be improved when I tried to add a publication to
a subscription which was disabled.

alter subscription sub add publication pub2;
--
ERROR: ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions
HINT: Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).
--

Because I was executing the ADD PUBLICATION command, I feel the hint should
also mention it instead of SET PUBLICATION.

Best regards,
Hou zj




0001-Improve-errhint-for-ALTER-SUBSCRIPTION-ADD-DROP-PUBL.patch
Description:  0001-Improve-errhint-for-ALTER-SUBSCRIPTION-ADD-DROP-PUBL.patch


  1   2   3   4   5   >