On Fri, Jun 3, 2022 at 11:01 AM Peter Smith <smithpb2...@gmail.com> wrote: > > Please find below my review comments for patch v17-0002: > > ====== > > 1. Commit message > > This patch adds a new SUBSCRIPTION parameter "origin". It Specifies whether > the subscription will request the publisher to only send changes that > originated locally, or to send any changes regardless of origin. > > "It Specifies" -> "It specifies"
Modified > ~~~ > > 2. Commit message > > Usage: > CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999' > PUBLICATION pub1 with (origin = local); > > "with" -> "WITH" Modified > ====== > > 3. doc/src/sgml/catalogs.sgml > > + <para> > + Possible origin values are <literal>local</literal>, > + <literal>any</literal>, or <literal>NULL</literal> if none is > specified. > + If <literal>local</literal>, the subscription will request the > + publisher to only send changes that originated locally. If > + <literal>any</literal>, the publisher sends any changes regardless of > + their origin. > + </para></entry> > > Should this also mention that NULL (default) is equivalent to 'any'? > > SUGGESTION > If <literal>any</literal> (or <literal>NULL</literal>), the publisher > sends any changes regardless of their origin. Modified > ====== > > 4. src/backend/catalog/pg_subscription.c > > @@ -72,6 +72,16 @@ GetSubscription(Oid subid, bool missing_ok) > sub->twophasestate = subform->subtwophasestate; > sub->disableonerr = subform->subdisableonerr; > > + datum = SysCacheGetAttr(SUBSCRIPTIONOID, > + tup, > + Anum_pg_subscription_suborigin, > + &isnull); > + > + if (!isnull) > + sub->origin = TextDatumGetCString(datum); > + else > + sub->origin = NULL; > + > /* Get conninfo */ > datum = SysCacheGetAttr(SUBSCRIPTIONOID, > tup, > > Missing comment like the nearby code has: > /* Get origin */ Modified > ====== > > 5. src/backend/replication/logical/worker.c > > +/* Macro for comparing string fields that might be NULL */ > +#define equalstr(a, b) \ > + (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b)) > + > > Should that have some extra parens for the macro args? > e.g. "strcmp((a), (b))" Modified > ~~~ > > 6. src/backend/replication/logical/worker.c - maybe_reread_subscription > > @@ -3059,6 +3063,7 @@ maybe_reread_subscription(void) > strcmp(newsub->slotname, MySubscription->slotname) != 0 || > newsub->binary != MySubscription->binary || > newsub->stream != MySubscription->stream || > + equalstr(newsub->origin, MySubscription->origin) || > newsub->owner != MySubscription->owner || > !equal(newsub->publications, MySubscription->publications)) > { > > Is that right? Shouldn't it say !equalstr(...)? Modified > ====== > > 7. src/backend/replication/pgoutput/pgoutput.c - parse_output_parameters > > @@ -380,6 +382,16 @@ parse_output_parameters(List *options, PGOutputData > *data) > > data->two_phase = defGetBoolean(defel); > } > + else if (strcmp(defel->defname, "origin") == 0) > + { > + if (origin_option_given) > + ereport(ERROR, > + (errcode(ERRCODE_SYNTAX_ERROR), > + errmsg("conflicting or redundant options"))); > + origin_option_given = true; > + > + data->origin = defGetString(defel); > + } > > Should this function also be validating that the origin parameter > value is only permitted to be one of "local" or "any"? Modified > ~~~ > > 8. src/backend/replication/pgoutput/pgoutput.c - pgoutput_origin_filter > > @@ -1698,12 +1710,20 @@ pgoutput_message(LogicalDecodingContext *ctx, > ReorderBufferTXN *txn, > } > > /* > - * Currently we always forward. > + * Return true if the data source (origin) is remote and user has requested > + * only local data, false otherwise. > */ > static bool > pgoutput_origin_filter(LogicalDecodingContext *ctx, > RepOriginId origin_id) > { > + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; > + > + if (data->origin && > + (strcmp(data->origin, "local") == 0) && > + origin_id != InvalidRepOriginId) > + return true; > + > return false; > } > > 8a. > Could rewrite the condition by putting the strcmp last so you can > avoid doing unnecessary strcmp. > > e.g > + if (data->origin && > + origin_id != InvalidRepOriginId && > + strcmp(data->origin, "local" == 0) I have avoided the strcmp by caching locally as suggested in 8b. I have not made any change for this. > > 8b. > I also wondered if it might be worth considering caching the origin > parameter value when it was parsed so that you can avoid doing any > strcmp at all during this function. Because otherwise this might be > called millions of times, right? Modified > ====== > > 9. src/include/catalog/pg_subscription.h > > @@ -87,6 +87,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) > BKI_SHARED_RELATION BKI_ROW > > /* List of publications subscribed to */ > text subpublications[1] BKI_FORCE_NOT_NULL; > + > + /* Publish the data originated from the specified origin */ > + text suborigin; > #endif > } FormData_pg_subscription; > > SUGGESTION (for the comment text) > /* Only publish data originating from the specified origin */ Modified > ~~~ > > 10 src/include/catalog/pg_subscription.h - Subscription > > @@ -118,6 +121,9 @@ typedef struct Subscription > char *slotname; /* Name of the replication slot */ > char *synccommit; /* Synchronous commit setting for worker */ > List *publications; /* List of publication names to subscribe to */ > + char *origin; /* Publish the data originated from the > + * specified origin */ > + > } Subscription; > > 10a. > Reword comment same as suggested in review comment #9 Modified > 10b. > Remove spurious blank line Modified > ====== > > 11. src/include/replication/walreceiver.h > > @@ -183,6 +183,8 @@ typedef struct > bool streaming; /* Streaming of large transactions */ > bool twophase; /* Streaming of two-phase transactions at > * prepare time */ > + char *origin; /* Publish the data originated from the > + * specified origin */ > } logical; > > Reword comment same as suggested in review comment #9 Modified > ====== > > 12. src/test/subscription/t/032_origin.pl > > +# Test logical replication using origin option. > > # Test the CREATE SUBSCRIPTION 'origin' parameter Modified > ~~~ > > 13. src/test/subscription/t/032_origin.pl > > +# check that the data published from node_C to node_B is not sent to node_A >+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY >1;"); > +is($result, qq(11 > +12), 'Remote data originating from another node (not the publisher) > is not replicated when origin option is local' > +); > > "option" -> "parameter" Modified Thanks for the comments, the v18 patch attached at [1] has the fixes for the same. [1] - https://www.postgresql.org/message-id/CALDaNm0Haovukx2q7Yd987Sm8fbQ0nsh8F0EWaO_qsw0uObGBQ%40mail.gmail.com Regards, Vignesh