From 8fe92c301d84ed6e34cfe0e397ef14d929e07053 Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Mon, 8 Aug 2022 14:14:44 +0300
Subject: [PATCH v10] Allow logical replication to copy table in binary

If binary option is enabled in a subscription and copy_format is set to binary, then copy tables in
binary format during table synchronization.

The patch introduces a new parameter, copy_binary, as subscription option to
allow users to choose the format of initial table synchronization

Without this patch, table are copied in text format even if the
subscription is created with binary option. When binary format is
beneficial to use, allowing the subscription to copy tables in binary in
table sync phase may reduce the time spent on copy depending on column types.

Discussion: https://postgr.es/m/CAGPVpCQvAziCLknEnygY0v1-KBtg%2BOm-9JHJYZOnNPKFJPompw%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |   9 +
 doc/src/sgml/logical-replication.sgml       |   9 +-
 doc/src/sgml/ref/alter_subscription.sgml    |  20 +-
 doc/src/sgml/ref/create_subscription.sgml   |  16 ++
 src/backend/catalog/pg_subscription.c       |   1 +
 src/backend/catalog/system_views.sql        |   3 +-
 src/backend/commands/subscriptioncmds.c     |  57 ++++-
 src/backend/replication/logical/tablesync.c |  10 +-
 src/bin/pg_dump/pg_dump.c                   |  17 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |  11 +-
 src/bin/psql/tab-complete.c                 |  14 +-
 src/include/catalog/pg_subscription.h       |   2 +
 src/test/regress/expected/subscription.out  | 178 +++++++++-------
 src/test/regress/sql/subscription.sql       |  18 ++
 src/test/subscription/meson.build           |   1 +
 src/test/subscription/t/032_binary_copy.pl  | 223 ++++++++++++++++++++
 17 files changed, 493 insertions(+), 97 deletions(-)
 create mode 100644 src/test/subscription/t/032_binary_copy.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..90f886b916 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8002,6 +8002,15 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        origin.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subcopybinary</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, initial data synchronization will be performed in binary format
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..83caf40a05 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -241,10 +241,11 @@
    types of the columns do not need to match, as long as the text
    representation of the data can be converted to the target type.  For
    example, you can replicate from a column of type <type>integer</type> to a
-   column of type <type>bigint</type>.  The target table can also have
-   additional columns not provided by the published table.  Any such columns
-   will be filled with the default value as specified in the definition of the
-   target table.
+   column of type <type>bigint</type>.  However, replication in binary format is
+   type specific and does not allow to replicate data between different types
+   according to its restrictions.  The target table can also have additional
+   columns not provided by the published table.  Any such columns will be filled
+   with the default value as specified in the definition of the target table.
   </para>
 
   <sect2 id="logical-replication-subscription-slot">
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 964fcbb8ff..b962ccd986 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -179,6 +179,22 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>copy_binary</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether pre-existing data on the publisher will be copied
+          to the subscriber in binary format. The default is <literal>false</literal>.
+          Binary format is very data type specific, it will not allow copying
+          between different column types as opposed to text format. Note that
+          if this option is enabled, all data types which will be copied during
+          the initial synchronization should have binary send and receive functions.
+          If this option is disabled, data format for the initial synchronization
+          will be text.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
     </listitem>
    </varlistentry>
@@ -213,8 +229,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
       <literal>binary</literal>, <literal>streaming</literal>,
-      <literal>disable_on_error</literal>, and
-      <literal>origin</literal>.
+      <literal>disable_on_error</literal>, <literal>origin</literal>
+      and <literal>copy_binary</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 51c45f17c7..a2ca8adffc 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -349,6 +349,22 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+      <varlistentry>
+        <term><literal>copy_binary</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether pre-existing data on the publisher will be copied
+          to the subscriber in binary format. The default is <literal>false</literal>.
+          Binary format is very data type specific, it will not allow copying
+          between different column types as opposed to text format. Note that
+          if this option is enabled, all data types which will be copied during
+          the initial synchronization should have binary send and receive functions.
+          If this option is disabled, data format for the initial synchronization
+          will be text.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..ffd3324bc1 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->copybinary = subform->subcopybinary;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 34ca0e739f..2521606bbc 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1316,7 +1316,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subslotname, subsynccommit, subpublications, suborigin,
+              subcopybinary)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..7c3d67ad73 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,7 @@
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
 #define SUBOPT_ORIGIN				0x00001000
+#define SUBOPT_COPY_BINARY			0x00002000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +91,7 @@ typedef struct SubOpts
 	bool		disableonerr;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	bool	    copy_binary;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -146,6 +148,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->disableonerr = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_COPY_BINARY))
+		opts->copy_binary = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -324,6 +328,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_COPY_BINARY) &&
+				 strcmp(defel->defname, "copy_binary") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_COPY_BINARY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_COPY_BINARY;
+			opts->copy_binary = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -560,7 +573,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN |
+					  SUBOPT_COPY_BINARY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -636,6 +650,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subcopybinary - 1] = BoolGetDatum(opts.copy_binary);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -1054,7 +1069,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN | SUBOPT_COPY_BINARY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1118,6 +1133,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_BINARY))
+				{
+					values[Anum_pg_subscription_subcopybinary - 1] =
+						BoolGetDatum(opts.copy_binary);
+					replaces[Anum_pg_subscription_subcopybinary - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -1158,7 +1180,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		case ALTER_SUBSCRIPTION_SET_PUBLICATION:
 			{
-				supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+				supported_opts = (SUBOPT_COPY_DATA | SUBOPT_REFRESH |
+								  SUBOPT_COPY_BINARY);
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1166,6 +1189,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					publicationListToArray(stmt->publication);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
 
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_BINARY))
+				{
+					values[Anum_pg_subscription_subcopybinary - 1] =
+						BoolGetDatum(opts.copy_binary);
+					replaces[Anum_pg_subscription_subcopybinary - 1] = true;
+				}
+
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
@@ -1205,7 +1235,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				List	   *publist;
 				bool		isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
 
-				supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
+				supported_opts = (SUBOPT_REFRESH | SUBOPT_COPY_DATA |
+								  SUBOPT_COPY_BINARY);
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
@@ -1214,6 +1245,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					publicationListToArray(publist);
 				replaces[Anum_pg_subscription_subpublications - 1] = true;
 
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_BINARY))
+				{
+					values[Anum_pg_subscription_subcopybinary - 1] =
+						BoolGetDatum(opts.copy_binary);
+					replaces[Anum_pg_subscription_subcopybinary - 1] = true;
+				}
+
 				update_tuple = true;
 
 				/* Refresh if user asked us to. */
@@ -1265,8 +1303,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
+				supported_opts = SUBOPT_COPY_DATA | SUBOPT_COPY_BINARY;
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   supported_opts, &opts);
+
+				if (IsSet(opts.specified_opts, SUBOPT_COPY_BINARY))
+				{
+					values[Anum_pg_subscription_subcopybinary - 1] =
+						BoolGetDatum(opts.copy_binary);
+					replaces[Anum_pg_subscription_subcopybinary - 1] = true;
+				}
+				update_tuple = true;
 
 				/*
 				 * The subscription option "two_phase" requires that
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..c14d32fe07 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -101,6 +101,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
 #include "replication/logicallauncher.h"
@@ -1090,6 +1091,7 @@ copy_table(Relation rel)
 	CopyFromState cstate;
 	List	   *attnamelist;
 	ParseState *pstate;
+	List 	   *options = NIL;
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
@@ -1168,6 +1170,12 @@ copy_table(Relation rel)
 
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
+
+	appendStringInfo(&cmd, "  WITH (FORMAT %s)", MySubscription->copybinary ? "binary" : "text");
+	options = lappend(options, makeDefElem("format",
+											(Node *) makeString(MySubscription->copybinary ? "binary" : "text"),
+											-1));
+
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
 	if (res->status != WALRCV_OK_COPY_OUT)
@@ -1184,7 +1192,7 @@ copy_table(Relation rel)
 										 NULL, false, false);
 
 	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
 
 	/* Do the copy */
 	(void) CopyFrom(cstate);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 24ba936332..1a863464c7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4493,6 +4493,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subcopybinary;
 	int			i,
 				ntups;
 
@@ -4545,9 +4546,15 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	if (fout->remoteVersion >= 160000)
-		appendPQExpBufferStr(query, " s.suborigin\n");
+	{
+		appendPQExpBufferStr(query, " s.suborigin,\n"
+									" s.subcopybinary\n");
+	}
 	else
-		appendPQExpBuffer(query, " '%s' AS suborigin\n", LOGICALREP_ORIGIN_ANY);
+	{
+		appendPQExpBuffer(query, " '%s' AS suborigin,\n", LOGICALREP_ORIGIN_ANY);
+		appendPQExpBuffer(query, " false AS subcopybinary\n");
+	}
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
@@ -4575,6 +4582,7 @@ getSubscriptions(Archive *fout)
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
+	i_subcopybinary = PQfnumber(res, "subcopybinary");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4605,6 +4613,8 @@ getSubscriptions(Archive *fout)
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
+		subinfo[i].subcopybinary =
+			pg_strdup(PQgetvalue(res, i, i_subcopybinary));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4686,6 +4696,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (strcmp(subinfo->subcopybinary, "t") == 0)
+		appendPQExpBuffer(query, ", copy_binary = true");
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index cdca0b993d..dc1227e494 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *suborigin;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *subcopybinary;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c8a0bb7b3a..edace80acb 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6472,7 +6472,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6527,10 +6527,15 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two-phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* Copy binary and origin are only supported in v16 and higher */
 		if (pset.sversion >= 160000)
+		{
 			appendPQExpBuffer(&buf,
-							  ", suborigin AS \"%s\"\n",
-							  gettext_noop("Origin"));
+							  ", suborigin AS \"%s\"\n"
+							  ", subcopybinary AS \"%s\"\n",
+							  gettext_noop("Origin"),
+							  gettext_noop("Copy binary"));
+		}
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5e1882eaea..9635eb2740 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1919,14 +1919,14 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> REFRESH PUBLICATION WITH ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
 			 TailMatches("REFRESH", "PUBLICATION", "WITH", "("))
-		COMPLETE_WITH("copy_data");
+		COMPLETE_WITH("copy_binary", "copy_data");
 	/* ALTER SUBSCRIPTION <name> SET */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, "SET"))
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "origin", "slot_name",
-					  "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "copy_binary", "disable_on_error", "origin",
+					  "slot_name", "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -1942,7 +1942,7 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER SUBSCRIPTION <name> ADD|DROP|SET PUBLICATION <name> WITH ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
 			 TailMatches("ADD|DROP|SET", "PUBLICATION", MatchAny, "WITH", "("))
-		COMPLETE_WITH("copy_data", "refresh");
+		COMPLETE_WITH("copy_binary", "copy_data", "refresh");
 
 	/* ALTER SCHEMA <name> */
 	else if (Matches("ALTER", "SCHEMA", MatchAny))
@@ -3267,9 +3267,9 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("WITH (");
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
-		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "origin", "slot_name",
-					  "streaming", "synchronous_commit", "two_phase");
+		COMPLETE_WITH("binary", "connect", "copy_binary", "copy_data",
+					  "create_slot", "disable_on_error", "enabled", "origin",
+					  "slot_name", "streaming", "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..a93a3f3dab 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -88,6 +88,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
 
+	bool		subcopybinary; 	/* True if initial sync will be performed in binary*/
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +132,7 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	bool		copybinary;		/* Indicates if initial sync will be in binary */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 3f99b14394..700bd108b0 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -114,18 +114,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | none   | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                         List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -143,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -163,10 +163,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -175,10 +175,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -210,10 +210,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                               List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                      List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | off       | d                | f                | any    | f           | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -247,19 +247,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -271,27 +271,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -306,10 +306,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                        List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more than once
@@ -324,10 +324,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -363,10 +363,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -375,10 +375,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -388,10 +388,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -404,18 +404,52 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | t                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - copy_binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_binary = foo);
+ERROR:  copy_binary requires a Boolean value
+-- now it works - create a subscription with copy_binary = true
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_binary = true);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t           | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- alter copy_binary to false
+ALTER SUBSCRIPTION regress_testsub SET (copy_binary = false);
+\dRs+
+                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | f           | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+-- add publication with copy_binary = true
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1 WITH (refresh = false, copy_binary = true);
+\dRs+
+                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |    Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Copy binary | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+--------------------+--------+-----------+------------------+------------------+--------+-------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1} | f      | off       | d                | f                | any    | t           | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7281f5fee2..f6b08cbb08 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -286,6 +286,24 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - copy_binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_binary = foo);
+
+-- now it works - create a subscription with copy_binary = true
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_binary = true);
+\dRs+
+
+-- alter copy_binary to false
+ALTER SUBSCRIPTION regress_testsub SET (copy_binary = false);
+\dRs+
+
+-- add publication with copy_binary = true
+ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1 WITH (refresh = false, copy_binary = true);
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..f9ab6eb7e1 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_binary_copy.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_binary_copy.pl b/src/test/subscription/t/032_binary_copy.pl
new file mode 100644
index 0000000000..bcad66e5ea
--- /dev/null
+++ b/src/test/subscription/t/032_binary_copy.pl
@@ -0,0 +1,223 @@
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables on both sides of the replication
+my $ddl = qq(
+	CREATE TABLE public.test_numerical (
+		a INTEGER PRIMARY KEY,
+		b NUMERIC,
+		c FLOAT,
+		d BIGINT
+	);
+	CREATE TABLE public.test_arrays (
+		a INTEGER[] PRIMARY KEY,
+		b NUMERIC[],
+		c TEXT[]
+	);
+    CREATE TABLE public.test_range_array (
+		a INTEGER PRIMARY KEY,
+		b TSTZRANGE,
+		c int8range[]
+	);
+    CREATE TYPE public.test_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER);
+    CREATE TABLE public.test_one_comp (
+		a INTEGER PRIMARY KEY,
+		b public.test_comp_basic_t
+	););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Publish all tables
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tpub FOR ALL TABLES");
+
+# Insert some content and before creating a subscription
+$node_publisher->safe_psql(
+	'postgres', qq(
+    INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10),
+        (2, 2.2, 2.3, 20);
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+        ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+    INSERT INTO test_range_array (a, b, c) VALUES
+		(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+		(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}');
+	INSERT INTO test_one_comp (a, b) VALUES
+		(1, ROW(1.0, 'a', 1)),
+		(2, ROW(2.0, 'b', 2));
+	));
+
+# Create the subscription with copy_binary = true
+my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+	    "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
+	  . "PUBLICATION tpub WITH (slot_name = tpub_slot, copy_binary = true)");
+
+# Ensure nodes are in sync with each other
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+my $sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;
+	SELECT a, b FROM test_one_comp ORDER BY a;
+	SELECT a, b, c FROM test_range_array ORDER BY a;
+);
+
+# Check the synced data on subscribers
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}
+1|(1,a,1)
+2|(2,b,2)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}', 'check synced data on subscriber');
+
+# Create a custom type without send/rcv functions
+$ddl = qq(
+    CREATE TYPE myvarchar;
+    CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar
+        LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin';
+    CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring
+        LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout';
+    CREATE TYPE myvarchar (
+        input = myvarcharin,
+        output = myvarcharout);
+    CREATE TABLE public.test_myvarchar (
+        a myvarchar
+    ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Insert some initial data
+$node_publisher->safe_psql(
+	'postgres', qq(
+    INSERT INTO public.test_myvarchar (a) VALUES
+		('a');
+    ));
+
+# Refresh the publication to trigger the tablesync
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION");
+
+# It should fail
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/);
+
+# Create and set send/rcv functions for the custom type
+$ddl = qq(
+    CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea
+        LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend';
+    CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar
+        LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv';
+    ALTER TYPE myvarchar SET (
+        send = myvarcharsend,
+        receive = myvarcharrecv
+    ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Now tablesync should succeed
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+$sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;
+	SELECT a, b FROM test_one_comp ORDER BY a;
+	SELECT a, b, c FROM test_range_array ORDER BY a;
+    SELECT a FROM test_myvarchar;
+);
+
+# Check the synced data on subscriber
+$result = $node_subscriber->safe_psql('postgres', $sync_check);
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}
+1|(1,a,1)
+2|(2,b,2)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+a', 'check synced data on subscriber with custom type');
+
+# Test syncing tables with mismatching column types
+$node_publisher->safe_psql(
+    'postgres', qq(
+    CREATE TABLE public.test_mismatching_types (
+        a bigint PRIMARY KEY
+    );
+    INSERT INTO public.test_mismatching_types (a)
+        VALUES (1), (2);
+    ));
+
+$node_subscriber->safe_psql(
+    'postgres', qq(
+    CREATE TABLE public.test_mismatching_types (
+        a int PRIMARY KEY
+    );
+    ALTER SUBSCRIPTION tsub REFRESH PUBLICATION;
+    ));
+
+# Cannot sync due to type mismatch
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/);
+
+# Setting copy_binary to false should allow syncing
+$node_subscriber->safe_psql(
+    'postgres', qq(
+    ALTER SUBSCRIPTION tsub SET (copy_binary = false);));
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+$sync_check =  qq(
+	SET timezone = '+2';
+	SELECT a, b, c, d FROM test_numerical ORDER BY a;
+	SELECT a, b, c FROM test_arrays ORDER BY a;
+	SELECT a, b FROM test_one_comp ORDER BY a;
+	SELECT a, b, c FROM test_range_array ORDER BY a;
+    SELECT a FROM test_myvarchar;
+    SELECT a FROM test_mismatching_types ORDER BY a;
+);
+
+# Check the synced data on subscribers
+$result = $node_subscriber->safe_psql('postgres', $sync_check);
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}
+1|(1,a,1)
+2|(2,b,2)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+a
+1
+2', 'check synced data on subscriber with copy_binary = false');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.25.1

