From 6a961ebaca47eb54bb20f2cdba1811653274d21b Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <daniel@yesql.se>
Date: Tue, 14 Jul 2020 13:54:37 +0200
Subject: [PATCH] Add binary protocol for publications and subscriptions

This adds support for transferring base types using binary format between
subscriber and publisher. A new column is added to pg_subscription, subbinary,
in order to track whether binary is enabled or not.

Author: Dave Cramer
---
 doc/src/sgml/catalogs.sgml                    |  10 ++
 doc/src/sgml/ref/alter_subscription.sgml      |   4 +-
 doc/src/sgml/ref/create_subscription.sgml     |  16 +++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/subscriptioncmds.c       |  41 +++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   3 +-
 src/backend/replication/logical/proto.c       | 123 ++++++++++------
 src/backend/replication/logical/worker.c      | 134 ++++++++++++------
 src/backend/replication/pgoutput/pgoutput.c   |  34 ++++-
 src/bin/psql/describe.c                       |   8 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/replication/logicalproto.h        |  30 ++--
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    |  56 +++++---
 src/test/regress/sql/subscription.sql         |  12 ++
 src/test/subscription/t/014_binary.pl         | 108 ++++++++++++++
 18 files changed, 458 insertions(+), 131 deletions(-)
 create mode 100644 src/test/subscription/t/014_binary.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index e9cdff4864..fe317ec37f 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7504,6 +7504,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+    <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subbinary</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription will request that the publisher send base
+       types in binary format.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index c24ace14d1..98ca11cb1c 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -163,8 +163,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
      <para>
       This clause alters parameters originally set by
       <xref linkend="sql-createsubscription"/>.  See there for more
-      information.  The allowed options are <literal>slot_name</literal> and
-      <literal>synchronous_commit</literal>
+      information.  The allowed options are <literal>slot_name</literal>, 
+      <literal>synchronous_commit</literal> and <literal>binary</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 5bbc165f70..1d71368254 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -128,6 +128,22 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>binary</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to
+          send the base types in binary or not.  The default
+          is <literal>false</literal>.
+         </para>
+         
+         <para>
+          Note: Only types that have send and receive functions will be
+          transferred in binary
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>slot_name</literal> (<type>string</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index cb15731115..e6afb3203e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -65,6 +65,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->binary = subform->subbinary;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b6d35c2d11..7f02d6dbb2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1122,7 +1122,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9ebb026187..d87de2ac2f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, bool *binary_given, bool *binary)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -90,6 +90,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (binary)
+	{
+		*binary_given = false;
+
+		/*
+		 * Not all versions of pgoutput will understand this option, so
+		 * default to false.
+		 */
+		*binary = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -175,6 +185,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		{
+			*binary_given = true;
+			*binary = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -325,6 +340,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
 	List	   *publications;
+	bool		binary;
+	bool		binary_given;
 
 	/*
 	 * Parse and check options.
@@ -334,7 +351,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &binary_given, &binary);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -400,6 +417,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -669,10 +687,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				bool		binary_given;
+				bool		binary;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &binary_given, &binary);
 
 				if (slotname_given)
 				{
@@ -697,6 +718,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (binary_given)
+				{
+					values[Anum_pg_subscription_subbinary - 1] =
+						BoolGetDatum(binary);
+					replaces[Anum_pg_subscription_subbinary - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -708,7 +736,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -746,7 +775,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -783,7 +812,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..7989b58019 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -423,7 +423,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
 		PQfreemem(pubnames_literal);
 		pfree(pubnames_str);
-
+		if (options->proto.logical.binary)
+			appendStringInfoString(&cmd, ", binary 'true'");
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3c6d0cd171..b91ca2714d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -17,8 +17,10 @@
 #include "catalog/pg_type.h"
 #include "libpq/pqformat.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 
 /*
@@ -31,7 +33,7 @@
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple);
+								   HeapTuple tuple, bool binary_basetypes);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -139,7 +141,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  * Write INSERT to the output stream.
  */
 void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
+logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary_basetypes)
 {
 	pq_sendbyte(out, 'I');		/* action INSERT */
 
@@ -147,7 +149,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple);
+	logicalrep_write_tuple(out, rel, newtuple, binary_basetypes);
 }
 
 /*
@@ -179,7 +181,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-						HeapTuple newtuple)
+						HeapTuple newtuple, bool binary_basetypes)
 {
 	pq_sendbyte(out, 'U');		/* action UPDATE */
 
@@ -196,26 +198,22 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple);
+		logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple);
+	logicalrep_write_tuple(out, rel, newtuple, binary_basetypes);
 }
 
 /*
  * Read UPDATE from stream.
  */
-LogicalRepRelId
+void
 logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 					   LogicalRepTupleData *oldtup,
 					   LogicalRepTupleData *newtup)
 {
 	char		action;
-	LogicalRepRelId relid;
-
-	/* read the relation id */
-	relid = pq_getmsgint(in, 4);
 
 	/* read and verify action */
 	action = pq_getmsgbyte(in);
@@ -240,15 +238,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
 			 action);
 
 	logicalrep_read_tuple(in, newtup);
-
-	return relid;
 }
 
 /*
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
+logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary_basetypes)
 {
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -264,7 +260,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple);
+	logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes);
 }
 
 /*
@@ -272,14 +268,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
  *
  * Fills the old tuple.
  */
-LogicalRepRelId
+void
 logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
 {
 	char		action;
-	LogicalRepRelId relid;
-
-	/* read the relation id */
-	relid = pq_getmsgint(in, 4);
 
 	/* read and verify action */
 	action = pq_getmsgbyte(in);
@@ -287,8 +279,6 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
 		elog(ERROR, "expected action 'O' or 'K', got %c", action);
 
 	logicalrep_read_tuple(in, oldtup);
-
-	return relid;
 }
 
 /*
@@ -437,7 +427,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary_basetypes)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -453,6 +443,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 			continue;
 		nliveatts++;
 	}
+
 	pq_sendint16(out, nliveatts);
 
 	/* try to allocate enough memory from the get-go */
@@ -479,7 +470,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 		}
 		else if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
 		{
-			pq_sendbyte(out, 'u');	/* unchanged toast column */
+			pq_sendbyte(out, LOGICALREP_UNCHANGED);
 			continue;
 		}
 
@@ -488,12 +479,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
 		typclass = (Form_pg_type) GETSTRUCT(typtup);
 
-		pq_sendbyte(out, 't');	/* 'text' data follows */
+		if (binary_basetypes &&
+			OidIsValid(typclass->typreceive) &&
+			(att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
+			(att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
+		{
+			bytea	   *outputbytes;
+			int			len;
+
+			pq_sendbyte(out, LOGICALREP_BINARY);
 
-		outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
-		pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
-		pfree(outputstr);
+			outputbytes = OidSendFunctionCall(typclass->typsend,
+											  values[i]);
 
+			len = VARSIZE(outputbytes) - VARHDRSZ;
+			pq_sendint(out, len, 4);	/* length */
+			pq_sendbytes(out, VARDATA(outputbytes), len);	/* data */
+			pfree(outputbytes);
+		}
+		else
+		{
+			pq_sendbyte(out, LOGICALREP_TEXT);
+			outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
+			pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
+			pfree(outputstr);
+		}
 		ReleaseSysCache(typtup);
 	}
 }
@@ -512,7 +522,11 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 	/* Get number of attributes */
 	natts = pq_getmsgint(in, 2);
 
-	memset(tuple->changed, 0, sizeof(tuple->changed));
+	tuple->values = palloc(natts * sizeof(StringInfoData *));
+
+	/* default is unchanged */
+	tuple->format = palloc(natts * sizeof(char));
+	memset(tuple->format, LOGICALREP_UNCHANGED, natts * sizeof(char));
 
 	/* Read the data */
 	for (i = 0; i < natts; i++)
@@ -524,25 +538,52 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
 		switch (kind)
 		{
 			case 'n':			/* null */
-				tuple->values[i] = NULL;
-				tuple->changed[i] = true;
-				break;
-			case 'u':			/* unchanged column */
-				/* we don't receive the value of an unchanged column */
-				tuple->values[i] = NULL;
-				break;
-			case 't':			/* text formatted value */
+				{
+					tuple->values[i] = (StringInfoData *) NULL;
+					tuple->format[i] = LOGICALREP_TEXT;
+					break;
+				}
+			case LOGICALREP_UNCHANGED:
+				{
+					/* we don't receive the value of an unchanged column */
+					tuple->values[i] = (StringInfoData *) NULL;
+					tuple->format[i] = LOGICALREP_UNCHANGED;	/* be explicit */
+					break;
+				}
+			case LOGICALREP_BINARY:
 				{
 					int			len;
+					StringInfoData *value = palloc(sizeof(StringInfoData));
+
+					tuple->format[i] = LOGICALREP_BINARY;
+
+					len = pq_getmsgint(in, 4);	/* read length */
 
-					tuple->changed[i] = true;
+					/* and data */
+					value->data = palloc(len + 1);
+					pq_copymsgbytes(in, value->data, len);
+					value->len = len;
+					value->cursor = 0;
+					value->maxlen = len;
+					/* not strictly necessary but the docs say it is required */
+					value->data[len] = '\0';
+					tuple->values[i] = value;
+					break;
+				}
+			case LOGICALREP_TEXT:
+				{
+					int			len;
+					StringInfoData *value = palloc(sizeof(StringInfoData));
 
+					tuple->format[i] = LOGICALREP_TEXT;
 					len = pq_getmsgint(in, 4);	/* read length */
 
 					/* and data */
-					tuple->values[i] = palloc(len + 1);
-					pq_copymsgbytes(in, tuple->values[i], len);
-					tuple->values[i][len] = '\0';
+					value->data = palloc(len + 1);
+					pq_copymsgbytes(in, value->data, len);
+					value->len = len;
+					value->data[len] = '\0';
+					tuple->values[i] = value;
 				}
 				break;
 			default:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f90a896fc3..844845c473 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -319,13 +319,12 @@ slot_store_error_callback(void *arg)
 }
 
 /*
- * Store data in C string form into slot.
- * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our
- * use better.
+ * Store data into slot.
+ * Data can be either text or binary transfer format
  */
 static void
-slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
-					char **values)
+slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
+				LogicalRepTupleData *tupleData)
 {
 	int			natts = slot->tts_tupleDescriptor->natts;
 	int			i;
@@ -351,18 +350,42 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 		int			remoteattnum = rel->attrmap->attnums[i];
 
 		if (!att->attisdropped && remoteattnum >= 0 &&
-			values[remoteattnum] != NULL)
+			tupleData->values[remoteattnum] != NULL)
 		{
-			Oid			typinput;
-			Oid			typioparam;
 
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-			slot->tts_values[i] =
-				OidInputFunctionCall(typinput, values[remoteattnum],
-									 typioparam, att->atttypmod);
+			if (tupleData->format[remoteattnum] == LOGICALREP_BINARY)
+			{
+				Oid			typreceive;
+				Oid			typioparam;
+				int			cursor;
+
+				cursor = tupleData->values[remoteattnum]->cursor;
+				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+				slot->tts_values[i] =
+					OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum],
+										   typioparam, att->atttypmod);
+
+				/*
+				 * Do not advance the cursor in case we need to re-read this
+				 * This saves us from pushing all of this type logic into
+				 * proto.c
+				 */
+				tupleData->values[remoteattnum]->cursor = cursor;
+
+			}
+			else
+			{
+				Oid			typinput;
+				Oid			typioparam;
+
+				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+				slot->tts_values[i] =
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data,
+										 typioparam, att->atttypmod);
+			}
 			slot->tts_isnull[i] = false;
 
 			errarg.local_attnum = -1;
@@ -387,9 +410,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
 }
 
 /*
- * Replace selected columns with user data provided as C strings.
- * This is somewhat similar to heap_modify_tuple but also calls the type
- * input functions on the user data.
+ * Replace selected columns with user data provided either as C strings or in
+ * binary format. This is somewhat similar to heap_modify_tuple but also calls
+ * the type input functions on the user data.
  * "slot" is filled with a copy of the tuple in "srcslot", with
  * columns selected by the "replaces" array replaced with data values
  * from "values".
@@ -398,9 +421,9 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
  * need to materialize "slot" at the end to make it independent of "srcslot".
  */
 static void
-slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
-					 LogicalRepRelMapEntry *rel,
-					 char **values, bool *replaces)
+slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
+				 LogicalRepRelMapEntry *rel,
+				 LogicalRepTupleData *tupleData)
 {
 	int			natts = slot->tts_tupleDescriptor->natts;
 	int			i;
@@ -438,21 +461,44 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot,
 		if (remoteattnum < 0)
 			continue;
 
-		if (!replaces[remoteattnum])
+		if (tupleData->format[remoteattnum] == LOGICALREP_UNCHANGED)
 			continue;
 
-		if (values[remoteattnum] != NULL)
+		if (tupleData->values[remoteattnum] != NULL)
 		{
-			Oid			typinput;
-			Oid			typioparam;
-
 			errarg.local_attnum = i;
 			errarg.remote_attnum = remoteattnum;
 
-			getTypeInputInfo(att->atttypid, &typinput, &typioparam);
-			slot->tts_values[i] =
-				OidInputFunctionCall(typinput, values[remoteattnum],
-									 typioparam, att->atttypmod);
+			if (tupleData->format[remoteattnum] == LOGICALREP_BINARY)
+			{
+				Oid			typreceive;
+				Oid			typioparam;
+				int			cursor;
+
+				cursor = tupleData->values[remoteattnum]->cursor;
+
+				getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
+				slot->tts_values[i] =
+					OidReceiveFunctionCall(typreceive, tupleData->values[remoteattnum],
+										   typioparam, att->atttypmod);
+
+				/*
+				 * Do not advance the cursor in case we need to re-read this
+				 * This saves us from pushing all of this type logic into
+				 * proto.c
+				 */
+				tupleData->values[remoteattnum]->cursor = cursor;
+			}
+			else
+			{
+				Oid			typinput;
+				Oid			typioparam;
+
+				getTypeInputInfo(att->atttypid, &typinput, &typioparam);
+				slot->tts_values[i] =
+					OidInputFunctionCall(typinput, tupleData->values[remoteattnum]->data,
+										 typioparam, att->atttypmod);
+			}
 			slot->tts_isnull[i] = false;
 
 			errarg.local_attnum = -1;
@@ -618,8 +664,10 @@ apply_handle_insert(StringInfo s)
 
 	ensure_transaction();
 
+	/* read the relation id */
 	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -641,7 +689,7 @@ apply_handle_insert(StringInfo s)
 
 	/* Process and store remote tuple in the slot */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel, newtup.values);
+	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
@@ -733,9 +781,12 @@ apply_handle_update(StringInfo s)
 
 	ensure_transaction();
 
-	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
-								   &newtup);
+	/* read the relation id */
+	relid = pq_getmsgint(s, 4);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	logicalrep_read_update(s, &has_oldtup, &oldtup,
+						   &newtup);
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -765,7 +816,7 @@ apply_handle_update(StringInfo s)
 	target_rte = list_nth(estate->es_range_table, 0);
 	for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
 	{
-		if (newtup.changed[i])
+		if (newtup.format[i] != LOGICALREP_UNCHANGED)
 			target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
 													 i + 1 - FirstLowInvalidHeapAttributeNumber);
 	}
@@ -776,8 +827,8 @@ apply_handle_update(StringInfo s)
 
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel,
-						has_oldtup ? oldtup.values : newtup.values);
+	slot_store_data(remoteslot, rel,
+					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
 	/* For a partitioned table, apply update to correct partition. */
@@ -831,8 +882,7 @@ apply_handle_update_internal(ResultRelInfo *relinfo,
 	{
 		/* Process and store remote tuple in the slot */
 		oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-		slot_modify_cstrings(remoteslot, localslot, relmapentry,
-							 newtup->values, newtup->changed);
+		slot_modify_data(remoteslot, localslot, relmapentry, newtup);
 		MemoryContextSwitchTo(oldctx);
 
 		EvalPlanQualSetSlot(&epqstate, remoteslot);
@@ -875,8 +925,11 @@ apply_handle_delete(StringInfo s)
 
 	ensure_transaction();
 
-	relid = logicalrep_read_delete(s, &oldtup);
+	/* read the relation id */
+	relid = pq_getmsgint(s, 4);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
+
+	logicalrep_read_delete(s, &oldtup);
 	if (!should_apply_changes_for_rel(rel))
 	{
 		/*
@@ -900,7 +953,7 @@ apply_handle_delete(StringInfo s)
 
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	slot_store_cstrings(remoteslot, rel, oldtup.values);
+	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
 	/* For a partitioned table, apply delete to correct partition. */
@@ -1096,9 +1149,9 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
 				if (found)
 				{
 					/* Apply the update.  */
-					slot_modify_cstrings(remoteslot_part, localslot,
-										 part_entry,
-										 newtup->values, newtup->changed);
+					slot_modify_data(remoteslot_part, localslot,
+									 part_entry,
+									 newtup);
 					MemoryContextSwitchTo(oldctx);
 				}
 				else
@@ -2106,6 +2159,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.binary = MySubscription->binary;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 15379e3118..e209ad8aa5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -20,6 +20,7 @@
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -118,11 +119,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-						List **publication_names)
+						List **publication_names, bool *binary_basetypes)
 {
 	ListCell   *lc;
 	bool		protocol_version_given = false;
 	bool		publication_names_given = false;
+	bool		binary_option_given = false;
+
+	*binary_basetypes = false;
 
 	foreach(lc, options)
 	{
@@ -138,7 +142,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (protocol_version_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						 errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname)));
 			protocol_version_given = true;
 
 			if (!scanint8(strVal(defel->arg), true, &parsed))
@@ -159,7 +163,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 			if (publication_names_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("conflicting or redundant options")));
+						 errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname)));
 			publication_names_given = true;
 
 			if (!SplitIdentifierString(strVal(defel->arg), ',',
@@ -168,6 +172,19 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 						(errcode(ERRCODE_INVALID_NAME),
 						 errmsg("invalid publication_names syntax")));
 		}
+		else if (strcmp(defel->defname, "binary") == 0)
+		{
+			if (binary_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options, \"%s\" already provided", defel->defname)));
+			binary_option_given = true;
+
+			if (!parse_bool(strVal(defel->arg), binary_basetypes))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("invalid binary option")));
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -202,7 +219,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		/* Parse the params and ERROR if we see any we don't recognize */
 		parse_output_parameters(ctx->output_plugin_options,
 								&data->protocol_version,
-								&data->publication_names);
+								&data->publication_names,
+								&data->binary_basetypes);
 
 		/* Check if we support requested protocol */
 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@@ -397,6 +415,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	switch (change->action)
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
+
 			{
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
@@ -411,7 +430,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				}
 
 				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_insert(ctx->out, relation, tuple);
+				logicalrep_write_insert(ctx->out, relation, tuple, data->binary_basetypes);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -435,7 +454,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				}
 
 				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
+				logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, data->binary_basetypes);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -455,7 +474,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				}
 
 				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, relation, oldtuple);
+				logicalrep_write_delete(ctx->out, relation, oldtuple, data->binary_basetypes);
+
 				OutputPluginWrite(ctx, true);
 			}
 			else
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 3b870c3b17..a7fd29085f 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5963,7 +5963,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false};
+	false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5987,6 +5987,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 					  gettext_noop("Enabled"),
 					  gettext_noop("Publication"));
 
+	/* Binary mode is only supported in v14 and higher */
+	if (pset.sversion >= 140000)
+		appendPQExpBuffer(&buf,
+						  ", subbinary AS \"%s\"\n",
+						  gettext_noop("Binary"));
+
 	if (verbose)
 	{
 		appendPQExpBuffer(&buf,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0a756d42d8..e8b3712574 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subbinary;		/* True if the subscription wants the output
+								 * plugin to send data in binary */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +76,8 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		binary;			/* Indicates if the subscription wants data in
+								 * binary format */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 4860561be9..3971f007ae 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -27,13 +27,19 @@
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 
+#define LOGICALREP_UNCHANGED 'u'
+#define LOGICALREP_BINARY 'b'
+#define LOGICALREP_TEXT 't'
+
 /* Tuple coming via logical replication. */
 typedef struct LogicalRepTupleData
 {
-	/* column values in text format, or NULL for a null value: */
-	char	   *values[MaxTupleAttributeNumber];
-	/* markers for changed/unchanged column values: */
-	bool		changed[MaxTupleAttributeNumber];
+	/* column values */
+	StringInfoData **values;
+
+	/* markers for changed/unchanged/binary/text */
+	char	   *format;
+
 } LogicalRepTupleData;
 
 typedef uint32 LogicalRepRelId;
@@ -87,17 +93,17 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 									XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, Relation rel,
-									HeapTuple newtuple);
+									HeapTuple newtuple, bool binary_basetypes);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple);
-extern LogicalRepRelId logicalrep_read_update(StringInfo in,
-											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
-											  LogicalRepTupleData *newtup);
+									HeapTuple newtuple, bool binary_basetypes);
+extern void logicalrep_read_update(StringInfo in,
+								   bool *has_oldtuple, LogicalRepTupleData *oldtup,
+								   LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, Relation rel,
-									HeapTuple oldtuple);
-extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
-											  LogicalRepTupleData *oldtup);
+									HeapTuple oldtuple, bool binary_basetypes);
+extern void logicalrep_read_delete(StringInfo in,
+								   LogicalRepTupleData *oldtup);
 extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
 									  bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 2e8e9daf44..a2d76689ed 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -25,6 +25,7 @@ typedef struct PGOutputData
 
 	List	   *publication_names;
 	List	   *publications;
+	bool		binary_basetypes;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c75dcebea0..7cd1458819 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -177,6 +177,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			bool		binary; /* Ask publisher output plugin to use binary */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index e7add9d2b8..af6ed982ee 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | off                | dbname=regress_doesnotexist
+                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,27 +91,27 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | off                | dbname=regress_doesnotexist2
+                                                          List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
 ALTER SUBSCRIPTION regress_testsub ENABLE;
 \dRs
-                            List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     
------------------+---------------------------+---------+---------------------
- regress_testsub | regress_subscription_user | t       | {testpub2,testpub3}
+                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary 
+-----------------+---------------------------+---------+---------------------+--------
+ regress_testsub | regress_subscription_user | t       | {testpub2,testpub3} | f
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub DISABLE;
 \dRs
-                            List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     
------------------+---------------------------+---------+---------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3}
+                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary 
+-----------------+---------------------------+---------+---------------------+--------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f
 (1 row)
 
 COMMIT;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                        List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | local              | dbname=regress_doesnotexist2
+                                                            List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -155,6 +155,22 @@ DROP SUBSCRIPTION IF EXISTS regress_testsub;
 NOTICE:  subscription "regress_testsub" does not exist, skipping
 DROP SUBSCRIPTION regress_testsub;  -- fail
 ERROR:  subscription "regress_testsub" does not exist
+-- fail - binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
+ERROR:  binary requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 9e234ab8b3..835bd05721 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -117,6 +117,18 @@ COMMIT;
 DROP SUBSCRIPTION IF EXISTS regress_testsub;
 DROP SUBSCRIPTION regress_testsub;  -- fail
 
+-- fail - binary must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (binary = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
new file mode 100644
index 0000000000..a252f470ef
--- /dev/null
+++ b/src/test/subscription/t/014_binary.pl
@@ -0,0 +1,108 @@
+# Binary mode logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Create and initialize a publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create and initialize subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $ddl = qq(
+	CREATE TABLE public.test_numerical (
+		a INTEGER PRIMARY KEY,
+		b NUMERIC,
+		c FLOAT,
+		d BIGINT
+		);
+	CREATE TABLE public.test_arrays (
+		a INTEGER[] PRIMARY KEY,
+		b NUMERIC[],
+		c TEXT[]
+		););
+
+# Create content on both sides of the replication
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Configure logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tpub FOR ALL TABLES");
+
+my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " .
+	"PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
+
+# Ensure nodes are in sync with eachother
+$node_publisher->wait_for_catchup('tsub');
+$node_subscriber->poll_query_until('postgres',
+	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');")
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert some content and make sure it's replicated across
+$node_publisher->safe_psql('postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+		('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(1, 1.2, 1.3, 10),
+		(2, 2.2, 2.3, 20),
+		(3, 3.2, 3.3, 30);
+	));
+
+$node_publisher->wait_for_catchup('tsub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is($result, '1|1.2|1.3|10
+2|2.2|2.3|20
+3|3.2|3.3|30', 'check replicated data on subscriber');
+
+# Test to reset back to text formatting, and then to binary again
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tsub SET (binary = false);");
+
+$node_publisher->safe_psql('postgres', qq(
+	INSERT INTO public.test_numerical (a, b, c, d) VALUES
+		(4, 4.2, 4.3, 40);
+	));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is($result, '1|1.2|1.3|10
+2|2.2|2.3|20
+3|3.2|3.3|30
+4|4.2|4.3|40', 'check replicated data on subscriber');
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tsub SET (binary = true);");
+
+$node_publisher->safe_psql('postgres', qq(
+	INSERT INTO public.test_arrays (a, b, c) VALUES
+		('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}');
+	));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT a, b, c FROM test_arrays ORDER BY a");
+
+is($result, '{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.21.1 (Apple Git-122.3)

