On Wed, Mar 30, 2022 at 7:22 PM Ashutosh Bapat
<ashutosh.bapat....@gmail.com> wrote:
>
> Hi Vignesh,
> Some review comments on 0001
>
> On Wed, Mar 16, 2022 at 11:17 AM vignesh C <vignes...@gmail.com> wrote:
>
> >
> > The changes for the same are available int the v5 patch available at [1].
> > [1] - 
> > https://www.postgresql.org/message-id/CALDaNm3wCf0YcvVo%2BgHMGpupk9K6WKJxCyLUvhPC2GkPKRZUWA%40mail.gmail.com
> >
>
>      cb->truncate_cb = pg_decode_truncate;
>      cb->commit_cb = pg_decode_commit_txn;
>      cb->filter_by_origin_cb = pg_decode_filter;
> +    cb->filter_remote_origin_cb = pg_decode_filter_remote_origin;
>
> Why do we need a new hook? Can we use filter_by_origin_cb? Also it looks like
> implementation of pg_decode_filter and pg_decode_filter_remote_origin is same,
> unless my eyes are deceiving me.

I have used filter_by_origin_cb for the implementation, removed
filter_remote_origin_cb

> -      <literal>binary</literal>, <literal>streaming</literal>, and
> -      <literal>disable_on_error</literal>.
> +      <literal>binary</literal>, <literal>streaming</literal>,
> +      <literal>disable_on_error</literal> and
> +      <literal>publish_local_only</literal>.
>
> "publish_local_only" as a "subscription" option looks odd. Should it be
> "subscribe_local_only"?

Modified

>
> +       <varlistentry>
> +        <term><literal>publish_local_only</literal>
> (<type>boolean</type>)</term>
> +        <listitem>
> +         <para>
> +          Specifies whether the subscription should subscribe only to the
> +          locally generated changes or subscribe to both the locally 
> generated
> +          changes and the replicated changes that was generated from other
> +          nodes in the publisher. The default is <literal>false</literal>.
>
> This description to uses verb "subscribe" instead of "publish".

Modified

> @@ -936,6 +951,13 @@ AlterSubscription(ParseState *pstate,
> AlterSubscriptionStmt *stmt,
>                          = true;
>                  }
>
> +                if (IsSet(opts.specified_opts, SUBOPT_PUBLISH_LOCAL_ONLY))
> +                {
> +                    values[Anum_pg_subscription_sublocal - 1] =
> +                        BoolGetDatum(opts.streaming);
>
> should this be opts.sublocal?

Yes you are right, Modified

>      cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
>      cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
>      cb->filter_by_origin_cb = pgoutput_origin_filter;
> +    cb->filter_remote_origin_cb = pgoutput_remote_origin_filter;
>
> pgoutput_origin_filter just returns false now. I think we should just enhance
> that function and reuse the callback, instead of adding a new callback.

Modified

> --- a/src/include/replication/logical.h
> +++ b/src/include/replication/logical.h
> @@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
>       */
>      bool        twophase_opt_given;
>
> +    bool        only_local;        /* publish only locally generated data */
> +
>
> If we get rid of the new callback, we won't need this new member as well.
> Anyway the filtering happens within the output plugin. There is nothing that
> core is required to do here.

Modified

> --- a/src/include/replication/walreceiver.h
> +++ b/src/include/replication/walreceiver.h
> @@ -183,6 +183,7 @@ typedef struct
>              bool        streaming;    /* Streaming of large transactions */
>              bool        twophase;    /* Streaming of two-phase transactions 
> at
>                                       * prepare time */
> +            bool        only_local; /* publish only locally generated data */
>
> Are we using this anywhere. I couldn't spot any usage of this member. I might
> be missing something though.

This is set in ApplyWorkerMain before calling libpqrcv_startstreaming.
This will be used in building the START_REPLICATION command.
Thanks for the comments, the attached v6 patch has the changes for the same.

Regards,
Vignesh
From 5be4b36317f8fd6cecd2c5f9d02e4c5f87026811 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Thu, 31 Mar 2022 17:08:35 +0530
Subject: [PATCH v6 1/2] Skip replication of non local data.

Add an option publish_local_only which will subscribe only to the locally
generated data in the publisher node. If subscriber is created with this
option, publisher will skip publishing the data that was subscribed
from other nodes. It can be created using following syntax:
ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (publish_local_only = on);
---
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  12 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   4 +-
 src/backend/commands/subscriptioncmds.c       |  26 +++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/worker.c      |   2 +
 src/backend/replication/pgoutput/pgoutput.c   |  23 +++
 src/bin/pg_dump/pg_dump.c                     |  13 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 139 ++++++++++--------
 src/test/regress/sql/subscription.sql         |   7 +
 src/test/subscription/t/032_circular.pl       | 108 ++++++++++++++
 18 files changed, 289 insertions(+), 73 deletions(-)
 create mode 100644 src/test/subscription/t/032_circular.pl

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index fe13ab9a2d..8f667c916e 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, <literal>streaming</literal>, and
-      <literal>disable_on_error</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>,
+      <literal>disable_on_error</literal>, and
+      <literal>subscribe_local_only</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index ebf7db57c5..36dfbc888b 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -152,6 +152,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>subscribe_local_only</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to send
+          locally generated changes or both the locally generated changes and
+          the replicated changes that was generated from other nodes. The
+          default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>slot_name</literal> (<type>string</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 0ff0982f7b..8e0d0ccced 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
 	sub->skiplsn = subform->subskiplsn;
+	sub->only_local = subform->sublocal;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 9eaa51df29..673ef29301 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1286,8 +1286,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subdisableonerr, subskiplsn, subslotname,
-              subsynccommit, subpublications)
+              substream, subtwophasestate, subdisableonerr, sublocal,
+              subskiplsn, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 85dacbe93d..55c48af2c9 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -64,6 +64,7 @@
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
+#define SUBOPT_LOCAL_ONLY			0x00001000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
 	bool		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	bool		only_local;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -138,6 +140,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
 		opts->disableonerr = false;
+	if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY))
+		opts->only_local = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -236,6 +240,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY) &&
+				 strcmp(defel->defname, "subscribe_local_only") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_LOCAL_ONLY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_LOCAL_ONLY;
+			opts->only_local = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -532,7 +545,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_LOCAL_ONLY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -602,6 +615,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+	values[Anum_pg_subscription_sublocal - 1] = BoolGetDatum(opts.only_local);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1019,7 +1033,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_LOCAL_ONLY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1076,6 +1091,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_LOCAL_ONLY))
+				{
+					values[Anum_pg_subscription_sublocal - 1] =
+						BoolGetDatum(opts.only_local);
+					replaces[Anum_pg_subscription_sublocal - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..b36d23d419 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -453,6 +453,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		if (options->proto.logical.only_local &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfoString(&cmd, ", subscribe_local_only 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f3868b3e1f..8cd1622419 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3106,6 +3106,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->owner != MySubscription->owner ||
+		newsub->only_local != MySubscription->only_local ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
@@ -3786,6 +3787,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.only_local = MySubscription->only_local;
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 20d0b1e125..298ceb87d9 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -294,12 +294,15 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		sequences_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		subscribe_local_only_option_given = false;
+
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
 	data->sequences = true;
+	data->only_local = false;
 
 	foreach(lc, options)
 	{
@@ -398,6 +401,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "subscribe_local_only") == 0)
+		{
+			if (subscribe_local_only_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			subscribe_local_only_option_given = true;
+
+			data->only_local = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -496,6 +509,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		if (data->only_local && data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support subscribe_local_only, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
@@ -1773,6 +1792,10 @@ static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
 					   RepOriginId origin_id)
 {
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepOriginId)
+		return true;
 	return false;
 }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 535b160165..4160642134 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4407,6 +4407,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_sublocal;
 	int			i,
 				ntups;
 
@@ -4451,11 +4452,13 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 150000)
 		appendPQExpBufferStr(query,
 							 " s.subtwophasestate,\n"
-							 " s.subdisableonerr\n");
+							 " s.subdisableonerr,\n"
+							 " s.sublocal\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%c' AS subtwophasestate,\n"
-						  " false AS subdisableonerr\n",
+						  " false AS subdisableonerr,\n"
+						  " false AS s.sublocal\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	appendPQExpBufferStr(query,
@@ -4483,6 +4486,7 @@ getSubscriptions(Archive *fout)
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+	i_sublocal = PQfnumber(res, "sublocal");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4512,6 +4516,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+		subinfo[i].sublocal =
+			pg_strdup(PQgetvalue(res, i, i_sublocal));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4585,6 +4591,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subdisableonerr, "t") == 0)
 		appendPQExpBufferStr(query, ", disable_on_error = true");
 
+	if (strcmp(subinfo->sublocal, "f") != 0)
+		appendPQExpBufferStr(query, ", subscribe_local_only = on");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 688093c55e..d40b54e081 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -664,6 +664,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *sublocal;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 4dddf08789..200c08d75e 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6280,7 +6280,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6318,9 +6318,11 @@ describeSubscriptions(const char *pattern, bool verbose)
 		if (pset.sversion >= 150000)
 			appendPQExpBuffer(&buf,
 							  ", subtwophasestate AS \"%s\"\n"
-							  ", subdisableonerr AS \"%s\"\n",
+							  ", subdisableonerr AS \"%s\"\n"
+							  ", sublocal AS \"%s\"\n",
 							  gettext_noop("Two phase commit"),
-							  gettext_noop("Disable on error"));
+							  gettext_noop("Disable on error"),
+							  gettext_noop("Only local"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 3f9dfffd57..9258b8dd01 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1882,7 +1882,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
+		COMPLETE_WITH("binary", "slot_name", "streaming", "subscribe_local_only", "synchronous_commit", "disable_on_error");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3167,7 +3167,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "enabled", "slot_name", "streaming",
+					  "enabled",  "slot_name", "streaming", "subscribe_local_only",
 					  "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 599c2e4422..38031bd587 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -66,6 +66,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		sublocal;		/* skip copying of remote origin data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 	bool		subdisableonerr;	/* True if a worker error should cause the
@@ -109,6 +111,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		only_local;		/* Skip copying of remote orgin data */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index f4e9f35d09..3407a1eff6 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -30,6 +30,7 @@ typedef struct PGOutputData
 	bool		messages;
 	bool		two_phase;
 	bool		sequences;
+	bool		only_local;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 92f73a55b8..65c83977a3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		only_local; /* publish only locally generated data */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 7fcfad1591..17b991b1fc 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -70,16 +70,35 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- ok - with subscribe_local_only = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, subscribe_local_only = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+ regress_testsub4
+                                                                                           List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | t          | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (subscribe_local_only = false);
+\dRs+ regress_testsub4
+                                                                                           List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -96,10 +115,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -108,10 +127,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -143,10 +162,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                           List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                 List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -179,19 +198,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -202,19 +221,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -229,10 +248,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                            List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -247,10 +266,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -284,10 +303,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -296,10 +315,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -308,10 +327,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -323,18 +342,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 74c38ead5d..badc00b7c7 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -54,7 +54,14 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- ok - with subscribe_local_only = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, subscribe_local_only = true);
+\dRs+ regress_testsub4
+ALTER SUBSCRIPTION regress_testsub4 SET (subscribe_local_only = false);
+\dRs+ regress_testsub4
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/032_circular.pl b/src/test/subscription/t/032_circular.pl
new file mode 100644
index 0000000000..30e14bf727
--- /dev/null
+++ b/src/test/subscription/t/032_circular.pl
@@ -0,0 +1,108 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test circular logical replication.
+#
+# Includes tests for circulation replication using subscribe_local_only option.
+#
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###################################################
+# Setup a circulation replication of pub/sub nodes.
+# node_A -> node_B -> node_A
+###################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_B->start;
+
+# Create tables on node_A
+$node_A->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create the same tables on node_B
+$node_B->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_B
+	CONNECTION '$node_A_connstr application_name=$appname_B'
+	PUBLICATION tap_pub_A
+	WITH (subscribe_local_only = on)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_A
+	CONNECTION '$node_B_connstr application_name=$appname_A'
+	PUBLICATION tap_pub_B
+	WITH (subscribe_local_only = on, copy_data = off)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1,1, "Circular replication setup is complete");
+
+my $result;
+
+##########################################################################
+# check that circular replication setup does not cause infinite recursive
+# insertion.
+##########################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+
+done_testing();
-- 
2.32.0

From 04e6cbca836173f689906c77941bf120e860fe0a Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Fri, 1 Apr 2022 12:05:34 +0530
Subject: [PATCH v6 2/2] Support force option for copy_data, check and throw an
 error if publisher tables were also subscribing data in the publisher from
 other publishers.

This patch does couple of things:
1) Added force option for copy_data.
2) Check and throw an error if the publication tables were also subscribing
data in the publisher from other publishers.

Let's consider an existing Multi master logical replication setup between
Node1 and Node2 that is created using the following steps:
a) Node1 - Publication publishing employee table.
b) Node2 - Subscription subscribing from publication pub1 with
subscribe_local_only.
c) Node2 - Publication publishing employee table.
d) Node1 - Subscription subscribing from publication pub2 with
subscribe_local_only. Now when user is trying to add another node Node3 to the
above Multi master logical replication setup, user will have to create one
subscription subscribing from Node1 and another subscription subscribing from
Node2 in Node3 using subscribe_local_only option and copy_data as true, while
the subscription is created, server will identify that Node2 is subscribing
from Node1 and Node1 is subscribing from Node2 and throw an error so
that user can handle the initial copy data.
---
 doc/src/sgml/ref/alter_subscription.sgml   |   8 +-
 doc/src/sgml/ref/create_subscription.sgml  |  59 +++++++-
 src/backend/commands/subscriptioncmds.c    | 159 +++++++++++++++++----
 src/test/regress/expected/subscription.out |   4 +-
 src/test/regress/sql/subscription.sql      |   1 +
 src/test/subscription/t/032_circular.pl    |  24 +++-
 src/tools/pgindent/typedefs.list           |   1 +
 7 files changed, 220 insertions(+), 36 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 8f667c916e..7cb485b0ec 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -161,12 +161,14 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
       <variablelist>
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
          </para>
          <para>
           Previously subscribed tables and sequences are not copied, even if a
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 36dfbc888b..a8e1aedfbe 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -161,6 +161,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           the replicated changes that was generated from other nodes. The
           default is <literal>false</literal>.
          </para>
+         <para>
+          If the tables in the publication were also subscribing to the data in
+          the publisher from other publishers, it will affect the
+          <command>CREATE SUBSCRIPTION</command> based on the value specified
+          for <literal>copy_data</literal> option. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -213,18 +220,28 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
        </varlistentry>
 
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.           
          </para>
          <para>
           If the publications contain <literal>WHERE</literal> clauses, it
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+
+         <para>
+          If the tables in the publication were also subscribing to the data in
+          the publisher from other publishers, it will affect the
+          <command>CREATE SUBSCRIPTION</command> based on the value specified
+          for <literal>subscribe_local_only</literal> option. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -375,6 +392,42 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    can have non-existent publications.
   </para>
 
+  <para>
+   Let's consider an existing Multi master logical replication setup between
+   Node1 and Node2 that is created using the following steps:
+   a) Node1 - Publication publishing employee table.
+   b) Node2 - Subscription subscribing from publication pub1 with
+   <literal>subscribe_local_only</literal>.
+   c) Node2 - Publication publishing employee table.
+   d) Node1 - Subscription subscribing from publication pub2 with
+   <literal>subscribe_local_only</literal>.
+   Now when user is trying to add another node Node3 to the above Multi master
+   logical replication setup, user will have to create one subscription
+   subscribing from Node1 and another subscription subscribing from Node2 in
+   Node3 using <literal>subscribe_local_only</literal> option and
+   <literal>copy_data</literal> as <literal>true</literal>, while
+   the subscription is created, server will identify that Node2 is subscribing
+   from Node1 and Node1 is subscribing from Node2 and throw an error like:
+   <programlisting>
+postgres=# CREATE SUBSCRIPTION mysub CONNECTION 'host=192.168.1.50 port=5432 user=foo dbname=foodb' 
+           PUBLICATION mypublication, insert_only with (subscribe_local_only=on);
+ERROR:  CREATE/ALTER SUBSCRIPTION with subscribe_local_only and copy_data as true is not allowed when the publisher might have replicated data, table:public.t1 might have replicated data in the publisher
+HINT:  Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force
+   </programlisting>
+   In this scenario user can solve this based on one of the 2 possibilities,
+   a) If there are no data present in Node1 and Node2, then the user can create
+   the subscriptions to Node1 and Node2 with
+   <literal>subscribe_local_only</literal> as <literal>true</literal> and
+   <literal>copy_data</literal> as <literal>false</literal>. b) If the data is
+   present, then the user can create subscription with
+   <literal>copy_data</literal> as <literal>force</literal> on Node1 and
+   <literal>copy_data</literal> as <literal>false</literal> on Node2, before
+   allowing any operations on the respective tables of Node1 and Node2, in this
+   case <literal>copy_data</literal> is <literal>false</literal> on Node2
+   because the data will be replicated to each other and available on both the
+   nodes.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 55c48af2c9..202ea72189 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,6 +69,18 @@
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
 
+#define IS_COPY_DATA_VALID(copy_data) (copy_data != COPY_DATA_OFF)
+
+/*
+ * Represents whether copy_data option is specified with on, off or force.
+ */
+typedef enum CopyData
+{
+	COPY_DATA_OFF,
+	COPY_DATA_ON,
+	COPY_DATA_FORCE
+} CopyData;
+
 /*
  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
  * SUBSCRIPTION command options and the parsed/default values of each of them.
@@ -81,7 +93,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
-	bool		copy_data;
+	CopyData	copy_data;
 	bool		refresh;
 	bool		binary;
 	bool		streaming;
@@ -91,12 +103,69 @@ typedef struct SubOpts
 	XLogRecPtr	lsn;
 } SubOpts;
 
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications,
+							  CopyData copydata, bool only_local);
 static List *fetch_sequence_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
+/*
+ * Validate the value specified for copy_data option.
+ */
+static CopyData
+DefGetCopyData(DefElem *def)
+{
+	/*
+	 * If no parameter given, assume "true" is meant.
+	 */
+	if (def->arg == NULL)
+		return COPY_DATA_ON;
+
+	/*
+	 * Allow 0, 1, "true", "false", "on", "off" or "force".
+	 */
+	switch (nodeTag(def->arg))
+	{
+		case T_Integer:
+			switch (intVal(def->arg))
+			{
+				case 0:
+					return COPY_DATA_OFF;
+				case 1:
+					return COPY_DATA_ON;
+				default:
+					/* otherwise, error out below */
+					break;
+			}
+			break;
+		default:
+		{
+			char    *sval = defGetString(def);
+
+			/*
+			 * The set of strings accepted here should match up with
+			 * the grammar's opt_boolean_or_string production.
+			 */
+			if (pg_strcasecmp(sval, "true") == 0)
+				return COPY_DATA_ON;
+			if (pg_strcasecmp(sval, "false") == 0)
+				return COPY_DATA_OFF;
+			if (pg_strcasecmp(sval, "on") == 0)
+				return COPY_DATA_ON;
+			if (pg_strcasecmp(sval, "off") == 0)
+				return COPY_DATA_OFF;
+			if (pg_strcasecmp(sval, "force") == 0)
+				return COPY_DATA_FORCE;
+		}
+		break;
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_SYNTAX_ERROR),
+			errmsg("%s requires a boolean or \"force\"", def->defname));
+	return COPY_DATA_OFF;                                           /* keep compiler quiet */
+}
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -129,7 +198,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
-		opts->copy_data = true;
+		opts->copy_data = COPY_DATA_ON;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -197,7 +266,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_COPY_DATA;
-			opts->copy_data = defGetBoolean(defel);
+			opts->copy_data = DefGetCopyData(defel);
 		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
@@ -334,17 +403,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "create_slot = true")));
 
-		if (opts->copy_data &&
+		if (IS_COPY_DATA_VALID(opts->copy_data) &&
 			IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", "copy_data = true/force")));
 
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
-		opts->copy_data = false;
+		opts->copy_data = COPY_DATA_OFF;
 	}
 
 	/*
@@ -672,13 +741,15 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			sync_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			sync_state = IS_COPY_DATA_VALID(opts.copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
 			 * Get the table and sequence list from publisher and build
 			 * local relation sync status info.
 			 */
-			relations = fetch_table_list(wrconn, publications);
+			relations = fetch_table_list(wrconn, publications, opts.copy_data,
+										 opts.only_local);
+
 			relations = list_concat(relations,
 									fetch_sequence_list(wrconn, publications));
 
@@ -724,7 +795,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 * as PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
 				 * PUBLICATION to work.
 				 */
-				if (opts.twophase && !opts.copy_data && relations != NIL)
+				if (opts.twophase && IS_COPY_DATA_VALID(opts.copy_data) &&
+					relations != NIL)
 					twophase_enabled = true;
 
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -763,8 +835,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
-						  List *validate_publications)
+AlterSubscription_refresh(Subscription *sub, CopyData copy_data,
+						  List *validate_publications, bool only_local)
 {
 	char	   *err;
 	List	   *pubrel_names;
@@ -799,7 +871,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the list of relations from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		pubrel_names = fetch_table_list(wrconn, sub->publications, copy_data,
+										only_local);
 		pubrel_names = list_concat(pubrel_names,
 								   fetch_sequence_list(wrconn, sub->publications));
 
@@ -855,7 +928,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 						 list_length(subrel_states), sizeof(Oid), oid_cmp))
 			{
 				AddSubscriptionRelState(sub->oid, relid,
-										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										IS_COPY_DATA_VALID(copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
 										InvalidXLogRecPtr);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
@@ -1138,7 +1211,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH | SUBOPT_LOCAL_ONLY;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1161,7 +1234,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 * See ALTER_SUBSCRIPTION_REFRESH for details why this is
 					 * not allowed.
 					 */
-					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && IS_COPY_DATA_VALID(opts.copy_data))
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
 								 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
@@ -1174,7 +1247,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					sub->publications = stmt->publication;
 
 					AlterSubscription_refresh(sub, opts.copy_data,
-											  stmt->publication);
+											  stmt->publication,
+											  opts.only_local);
 				}
 
 				break;
@@ -1186,7 +1260,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
+				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA | SUBOPT_LOCAL_ONLY;
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1226,7 +1300,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					sub->publications = publist;
 
 					AlterSubscription_refresh(sub, opts.copy_data,
-											  validate_publications);
+											  validate_publications,
+											  opts.only_local);
 				}
 
 				break;
@@ -1240,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   SUBOPT_COPY_DATA | SUBOPT_LOCAL_ONLY,
+										   &opts);
 
 				/*
 				 * The subscription option "two_phase" requires that
@@ -1259,7 +1335,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				 *
 				 * For more details see comments atop worker.c.
 				 */
-				if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+				if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+					IS_COPY_DATA_VALID(opts.copy_data))
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
@@ -1268,7 +1345,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
 
-				AlterSubscription_refresh(sub, opts.copy_data, NULL);
+				AlterSubscription_refresh(sub, opts.copy_data, NULL,
+										  opts.only_local);
 
 				break;
 			}
@@ -1783,22 +1861,28 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
  * publisher connection.
  */
 static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_table_list(WalReceiverConn *wrconn, List *publications, CopyData copydata,
+				 bool only_local)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, CHAROID};
 	List	   *tablelist = NIL;
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename, PS.srsubstate as replicated\n"
+						   "FROM pg_publication P,\n"
+						   "LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+						   "pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						   "WHERE C.oid = GPT.relid AND P.pubname in (");
+
 	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1824,6 +1908,25 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		rv = makeRangeVar(nspname, relname, -1);
 		tablelist = lappend(tablelist, rv);
 
+		/*
+		 * XXX: During initial table sync we cannot differentiate between the
+		 * local and non-local data that is present in the HEAP. Identification
+		 * of local data can be done only from the WAL by using the origin id.
+		 * Throw an error so that the user can take care of the initial data
+		 * copying and then create subscription with copy_data off.
+		 *
+		 * It is quite possible that subscriber has not yet pulled data to
+		 * the tables, but in ideal cases the table data will be subscribed.
+		 * Too keep the code simple it is not checked if the subscriber table
+		 * has pulled the data or not.
+		 */
+		if (copydata == COPY_DATA_ON && only_local && !slot_attisnull(slot, 3))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("CREATE/ALTER SUBSCRIPTION with subscribe_local_only and copy_data as true is not allowed when the publisher might have replicated data, table:%s.%s might have replicated data in the publisher",
+						   nspname, relname),
+					errhint("Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force"));
+
 		ExecClearTuple(slot);
 	}
 	ExecDropSingleTupleTableSlot(slot);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 17b991b1fc..fad2e6b3aa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -47,7 +47,9 @@ ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 -- fail - invalid option combinations
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
-ERROR:  connect = false and copy_data = true are mutually exclusive options
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 ERROR:  connect = false and enabled = true are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index badc00b7c7..dfe36e000e 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -40,6 +40,7 @@ SET SESSION AUTHORIZATION 'regress_subscription_user';
 
 -- fail - invalid option combinations
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true);
diff --git a/src/test/subscription/t/032_circular.pl b/src/test/subscription/t/032_circular.pl
index 30e14bf727..6852372f8d 100644
--- a/src/test/subscription/t/032_circular.pl
+++ b/src/test/subscription/t/032_circular.pl
@@ -42,6 +42,10 @@ $node_A->safe_psql('postgres',
 $node_B->safe_psql('postgres',
 	"CREATE TABLE tab_full (a int PRIMARY KEY)");
 
+my $result;
+my $stdout;
+my $stderr;
+
 # Setup logical replication
 # node_A (pub) -> node_B (sub)
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
@@ -65,6 +69,25 @@ $node_A->safe_psql('postgres',	"
 	PUBLICATION tap_pub_B
 	WITH (subscribe_local_only = on, copy_data = off)");
 
+($result, $stdout, $stderr) = $node_A->psql('postgres',  "
+        CREATE SUBSCRIPTION tap_sub_A1
+        CONNECTION '$node_B_connstr application_name=$appname_A'
+        PUBLICATION tap_pub_B
+        WITH (subscribe_local_only = on, copy_data = on)");
+like(
+        $stderr,
+        qr/ERROR:  CREATE\/ALTER SUBSCRIPTION with subscribe_local_only and copy_data as true is not allowed when the publisher might have replicated data/,
+        "Create subscription with subscribe_local_only and copy_data having replicated table in publisher");
+
+$node_A->safe_psql('postgres',  "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$appname_A'
+        PUBLICATION tap_pub_B
+        WITH (subscribe_local_only = on, copy_data = force)");
+
+$node_A->safe_psql('postgres',  "
+        DROP SUBSCRIPTION tap_sub_A2");
+
 # Wait for subscribers to finish initialization
 $node_A->wait_for_catchup($appname_B);
 $node_B->wait_for_catchup($appname_A);
@@ -79,7 +102,6 @@ $node_B->poll_query_until('postgres', $synced_query)
 
 is(1,1, "Circular replication setup is complete");
 
-my $result;
 
 ##########################################################################
 # check that circular replication setup does not cause infinite recursive
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 72fafb795b..bd2e84a91b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -442,6 +442,7 @@ ConvProcInfo
 ConversionLocation
 ConvertRowtypeExpr
 CookedConstraint
+CopyData
 CopyDest
 CopyFormatOptions
 CopyFromState
-- 
2.32.0

Reply via email to