On 21/03/17 18:54, Robert Haas wrote: > On Mon, Mar 20, 2017 at 7:56 PM, Petr Jelinek > <petr.jeli...@2ndquadrant.com> wrote: >> On 18/03/17 13:31, Petr Jelinek wrote: >>> On 07/03/17 06:23, Petr Jelinek wrote: >>>> there has been discussion at the logical replication initial copy thread >>>> [1] about making apply work with sync commit off by default for >>>> performance reasons and adding option to change that per subscription. >>>> >>>> Here I propose patch to implement this - it adds boolean column >>>> subssynccommit to pg_subscription catalog which decides if >>>> synchronous_commit should be off or local for apply. And it adds >>>> SYNCHRONOUS_COMMIT = boolean to the list of WITH options for CREATE and >>>> ALTER SUBSCRIPTION. When nothing is specified it will set it to false. >>>> >>>> The patch is built on top of copy patch currently as there are conflicts >>>> between the two and this helps a bit with testing of copy patch. >>>> >>>> [1] >>>> https://www.postgresql.org/message-id/CA+TgmoY7Lk2YKArcp4O=Qu=xoor8j71mad1oteojawmuje3...@mail.gmail.com >>>> >>> >>> I rebased this patch against recent changes and the latest version of >>> copy patch. >> >> And another rebase after pg_dump tests commit. > > + else if (strcmp(defel->defname, "nosynchronous_commit") == 0 > && synchronous_commit) > + { > + if (synchronous_commit_given) > + ereport(ERROR, > + (errcode(ERRCODE_SYNTAX_ERROR), > + errmsg("conflicting or redundant options"))); > + > + synchronous_commit_given = true; > + *synchronous_commit = !defGetBoolean(defel); > + } > > Uh, what's this nosynchronous_commit thing?
Ah originally I didn't have it as bool just as (no)synchronous_commit, forgot to rip this out. > > + <literal>local</literal> otherwise to <literal>false</literal>. The > + default value is <literal>false</literal> independently of the default > + <literal>synchronous_commit</literal> setting for the instance. > > This phrasing isn't very clear or accurate, IMHO. I'd say something > like "The value of this parameter overrides the synchronous_commit > setting. The default value is false." And I'd make the word > synchronous_commit in that sentence a link to the GUC, so that it's > absolutely unmistakable what we mean by "the synchronous_commit > setting". Okay. > > /* > + * We need to make new connection to new slot if slot name has changed > + * so exit here as well if that's the case. > + */ > + if (strcmp(newsub->slotname, MySubscription->slotname) != 0) > + { > + ereport(LOG, > + (errmsg("logical replication worker for subscription > \"%s\" will " > + "restart because the replication slot name > was changed", > + MySubscription->name))); > + > + walrcv_disconnect(wrconn); > + proc_exit(0); > + } > > Looks unrelated. > Oops, need to fix this separately. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 1234142533027e411a2ea5feff60f782402cefe2 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Mon, 6 Mar 2017 13:07:45 +0100 Subject: [PATCH] Add option to modify sync commit per subscription This also changes default behaviour of subscription workers to synchronous_commit = off --- doc/src/sgml/catalogs.sgml | 11 +++++++ doc/src/sgml/ref/alter_subscription.sgml | 1 + doc/src/sgml/ref/create_subscription.sgml | 12 ++++++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/commands/subscriptioncmds.c | 49 ++++++++++++++++++++++++------ src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/worker.c | 10 ++++++ src/bin/pg_dump/pg_dump.c | 12 +++++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/pg_dump/t/002_pg_dump.pl | 2 +- src/bin/psql/describe.c | 5 ++- src/include/catalog/pg_subscription.h | 11 ++++--- src/test/regress/expected/subscription.out | 27 ++++++++-------- src/test/regress/sql/subscription.sql | 3 +- 14 files changed, 115 insertions(+), 32 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 228ec78..f71d9c9 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6395,6 +6395,17 @@ </row> <row> + <entry><structfield>subsynccommit</structfield></entry> + <entry><type>bool</type></entry> + <entry></entry> + <entry> + If true, the apply for the subscription will run with + <literal>synchronous_commit</literal> set to <literal>local</literal>. + Otherwise it will have it set to <literal>false</literal>. + </entry> + </row> + + <row> <entry><structfield>subconninfo</structfield></entry> <entry><type>text</type></entry> <entry></entry> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6335e17..712de98 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -26,6 +26,7 @@ ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> WITH ( <rep <phrase>where <replaceable class="PARAMETER">suboption</replaceable> can be:</phrase> SLOT NAME = slot_name + | SYNCHRONOUS_COMMIT = boolean ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> SET PUBLICATION publication_name [, ...] { REFRESH WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) | NOREFRESH } ALTER SUBSCRIPTION <replaceable class="PARAMETER">name</replaceable> REFRESH PUBLICATION WITH ( <replaceable class="PARAMETER">puboption</replaceable> [, ... ] ) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 6468470..6e8a676 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -29,6 +29,7 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl | CREATE SLOT | NOCREATE SLOT | SLOT NAME = slot_name | COPY DATA | NOCOPY DATA + | SYNCHRONOUS_COMMIT = boolean | NOCONNECT </synopsis> </refsynopsisdiv> @@ -145,6 +146,17 @@ CREATE SUBSCRIPTION <replaceable class="PARAMETER">subscription_name</replaceabl </varlistentry> <varlistentry> + <term><literal>SYNCHRONOUS_COMMIT = <replaceable class="parameter">boolean</replaceable></literal></term> + <listitem> + <para> + The value of this parameter overrides the + <xref linkend="guc-synchronous-commit">synchronous_commit setting. + The default value is <literal>false<literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term>NOCONNECT</term> <listitem> <para> diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 9b74892..26921aa 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -68,6 +68,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->name = pstrdup(NameStr(subform->subname)); sub->owner = subform->subowner; sub->enabled = subform->subenabled; + sub->synccommit = subform->subsynccommit; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index cba2d5c..69512ea 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -60,12 +60,13 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, - bool *copy_data) + bool *copy_data, bool *synchronous_commit) { ListCell *lc; bool connect_given = false; bool create_slot_given = false; bool copy_data_given = false; + bool synchronous_commit_given = false; if (connect) *connect = true; @@ -80,6 +81,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *slot_name = NULL; if (copy_data) *copy_data = true; + if (synchronous_commit) + *synchronous_commit = false; /* Parse options */ foreach (lc, options) @@ -165,6 +168,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, copy_data_given = true; *copy_data = !defGetBoolean(defel); } + else if (strcmp(defel->defname, "synchronous_commit") == 0 && synchronous_commit) + { + if (synchronous_commit_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + synchronous_commit_given = true; + *synchronous_commit = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -269,6 +282,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool synchronous_commit; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -280,7 +294,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, &connect, &enabled_given, - &enabled, &create_slot, &slotname, ©_data); + &enabled, &create_slot, &slotname, ©_data, + &synchronous_commit); /* * Since creating a replication slot is not transactional, rolling back @@ -330,6 +345,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) DirectFunctionCall1(namein, CStringGetDatum(stmt->subname)); values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); values[Anum_pg_subscription_subslotname - 1] = @@ -581,14 +598,26 @@ AlterSubscription(AlterSubscriptionStmt *stmt) { case ALTER_SUBSCRIPTION_OPTIONS: { - char *slot_name; + char *slot_name; + bool synchronous_commit; + Form_pg_subscription form; + + form = (Form_pg_subscription) GETSTRUCT(tup); + synchronous_commit = form->subsynccommit; parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, &slot_name, NULL); + NULL, &slot_name, NULL, + &synchronous_commit); - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; + if (slot_name) + { + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + } + values[Anum_pg_subscription_subsynccommit - 1] = + BoolGetDatum(synchronous_commit); + replaces[Anum_pg_subscription_subsynccommit - 1] = true; update_tuple = true; break; @@ -601,7 +630,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL); + NULL, NULL, NULL); Assert(enabled_given); values[Anum_pg_subscription_subenabled - 1] = @@ -626,7 +655,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -652,7 +681,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Subscription *sub = GetSubscription(subid, false); parse_subscription_options(stmt->options, NULL, NULL, NULL, - NULL, NULL, ©_data); + NULL, NULL, ©_data, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 255b225..0d3ec27 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -126,7 +126,7 @@ get_subscription_list(void) */ oldcxt = MemoryContextSwitchTo(resultcxt); - sub = (Subscription *) palloc(sizeof(Subscription)); + sub = (Subscription *) palloc0(sizeof(Subscription)); sub->oid = HeapTupleGetOid(tup); sub->dbid = subform->subdbid; sub->owner = subform->subowner; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7de8756..5e4b9ba 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1411,6 +1411,11 @@ reread_subscription(void) MemoryContextSwitchTo(oldctx); + /* Change synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (started_tx) CommitTransactionCommand(); @@ -1478,6 +1483,11 @@ ApplyWorkerMain(Datum main_arg) MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); + /* Setup synchronous commit according to the user's wishes */ + SetConfigOption("synchronous_commit", + MySubscription->synccommit ? "local" : "off", + PGC_BACKEND, PGC_S_OVERRIDE); + if (!MySubscription->enabled) { ereport(LOG, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 86cb276..5b288a2 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3622,6 +3622,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_subenabled; + int i_subsynccommit; int i_subconninfo; int i_subslotname; int i_subpublications; @@ -3639,7 +3640,8 @@ getSubscriptions(Archive *fout) appendPQExpBuffer(query, "SELECT s.tableoid, s.oid, s.subname," "(%s s.subowner) AS rolname, s.subenabled, " - " s.subconninfo, s.subslotname, s.subpublications " + " s.subsynccommit, s.subconninfo, s.subslotname, " + " s.subpublications " "FROM pg_catalog.pg_subscription s " "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" " WHERE datname = current_database())", @@ -3653,6 +3655,7 @@ getSubscriptions(Archive *fout) i_subname = PQfnumber(res, "subname"); i_rolname = PQfnumber(res, "rolname"); i_subenabled = PQfnumber(res, "subenabled"); + i_subsynccommit = PQfnumber(res, "subsynccommit"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subpublications = PQfnumber(res, "subpublications"); @@ -3670,6 +3673,8 @@ getSubscriptions(Archive *fout) subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); subinfo[i].subenabled = (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + subinfo[i].subsynccommit = + (strcmp(PQgetvalue(res, i, i_subsynccommit), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); subinfo[i].subpublications = @@ -3744,6 +3749,11 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) else appendPQExpBufferStr(query, ", DISABLED"); + if (subinfo->subsynccommit) + appendPQExpBufferStr(query, ", SYNCHRONOUS_COMMIT = true"); + else + appendPQExpBufferStr(query, ", SYNCHRONOUS_COMMIT = false"); + appendPQExpBufferStr(query, ", SLOT NAME = "); appendStringLiteralAH(query, subinfo->subslotname, fout); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a466527..5934eb0 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -604,6 +604,7 @@ typedef struct _SubscriptionInfo DumpableObject dobj; char *rolname; bool subenabled; + bool subsynccommit; char *subconninfo; char *subslotname; char *subpublications; diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index f749cd3..0923893 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -4223,7 +4223,7 @@ qr/CREATE TRANSFORM FOR integer LANGUAGE sql \(FROM SQL WITH FUNCTION pg_catalog CONNECTION \'dbname=doesnotexist\' PUBLICATION pub1 WITH (NOCONNECT);', regexp => qr/^ - \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (NOCONNECT, DISABLED, SLOT NAME = 'sub1');\E + \QCREATE SUBSCRIPTION sub1 CONNECTION 'dbname=doesnotexist' PUBLICATION pub1 WITH (NOCONNECT, DISABLED, SYNCHRONOUS_COMMIT = false, SLOT NAME = 'sub1');\E /xm, like => { binary_upgrade => 1, diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 61a3e2a..118a037 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5115,7 +5115,8 @@ describeSubscriptions(const char *pattern, bool verbose) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, + false, false}; if (pset.sversion < 100000) { @@ -5141,7 +5142,9 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { appendPQExpBuffer(&buf, + ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", + gettext_noop("Synchronous Commit"), gettext_noop("Conninfo")); } diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 0811880..62845e9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -39,6 +39,7 @@ CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) BKI_SCHE bool subenabled; /* True if the subscription is enabled * (the worker should be running) */ + bool subsynccommit; /* Should apply use synchronous commit? */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ text subconninfo; /* Connection string to the publisher */ @@ -54,14 +55,15 @@ typedef FormData_pg_subscription *Form_pg_subscription; * compiler constants for pg_subscription * ---------------- */ -#define Natts_pg_subscription 7 +#define Natts_pg_subscription 8 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 #define Anum_pg_subscription_subenabled 4 -#define Anum_pg_subscription_subconninfo 5 -#define Anum_pg_subscription_subslotname 6 -#define Anum_pg_subscription_subpublications 7 +#define Anum_pg_subscription_subsynccommit 5 +#define Anum_pg_subscription_subconninfo 6 +#define Anum_pg_subscription_subslotname 7 +#define Anum_pg_subscription_subpublications 8 typedef struct Subscription @@ -71,6 +73,7 @@ typedef struct Subscription char *name; /* Name of the subscription */ Oid owner; /* Oid of the subscription owner */ bool enabled; /* Indicates if the subscription is enabled */ + bool synccommit; /* Indicates if apply should use synchronous commit */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ List *publications; /* List of publication names to subscribe to */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d8dc55a..42a4a3c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -27,19 +27,19 @@ reset client_min_messages; CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (NOCONNECT); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+-------------+--------------------- - testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+-------------+--------------------+--------------------- + testsub | regress_subscription_user | f | {testpub} | f | dbname=doesnotexist (1 row) ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3 NOREFRESH; ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+---------------------+---------------------- - testsub | regress_subscription_user | f | {testpub2,testpub3} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +---------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub | regress_subscription_user | f | {testpub2,testpub3} | f | dbname=doesnotexist2 (1 row) BEGIN; @@ -66,11 +66,12 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; ERROR: must be owner of subscription testsub RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+--------------------- - testsub_foo | regress_subscription_user | f | {testpub2,testpub3} +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Synchronous Commit | Conninfo +-------------+---------------------------+---------+---------------------+--------------------+---------------------- + testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | t | dbname=doesnotexist2 (1 row) -- rename back to keep the rest simple diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 62c99d8..8617654 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -47,8 +47,9 @@ ALTER SUBSCRIPTION testsub RENAME TO testsub_dummy; RESET ROLE; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; +ALTER SUBSCRIPTION testsub_foo WITH (SYNCHRONOUS_COMMIT = true); -\dRs +\dRs+ -- rename back to keep the rest simple ALTER SUBSCRIPTION testsub_foo RENAME TO testsub; -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers