From a5241e0024a9037c1b4fc2da602e8d77aab8742f Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Fri, 14 Jun 2019 15:39:47 -0400
Subject: [PATCH 2/7] add binary column to pg_subscription bump catversion
 support create and alter subcription with binary option

---
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/commands/subscriptioncmds.c       | 39 +++++++++++++++----
 .../libpqwalreceiver/libpqwalreceiver.c       |  3 ++
 src/backend/replication/logical/worker.c      |  1 +
 src/include/catalog/catversion.h              |  2 +-
 src/include/catalog/pg_subscription.h         |  4 ++
 src/include/replication/walreceiver.h         |  1 +
 7 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index f7800f01a6..3096cf2e5f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1063,7 +1063,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5408edcfc2..fd5057356a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -58,7 +58,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, bool *binary_given, bool *binary)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -89,6 +89,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (binary)
+	{
+		*binary_given = false;
+		/* not all versions of pgoutput will understand this option default to false */
+		*binary = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -174,6 +180,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		{
+			*binary_given = true;
+			*binary = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -323,8 +334,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		slotname_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
-	List	   *publications;
+	bool		binary;
+	bool		binary_given;
 
+	List	   *publications;
 	/*
 	 * Parse and check options.
 	 *
@@ -333,7 +346,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &binary_given, &binary);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -399,6 +412,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -669,10 +683,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				bool		binary_given;
+				bool		binary;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &binary_given, &binary);
 
 				if (slotname_given)
 				{
@@ -697,6 +714,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (binary_given)
+				{
+				values[Anum_pg_subscription_subbinary - 1] =
+					BoolGetDatum(binary);
+					replaces[Anum_pg_subscription_subbinary - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -708,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -746,7 +771,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -783,7 +808,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 545d2fcd05..c4daea33d2 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -400,6 +400,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		char	   *pubnames_str;
 		List	   *pubnames;
 		char	   *pubnames_literal;
+		bool		binary;
 
 		appendStringInfoString(&cmd, " (");
 
@@ -421,6 +422,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
 		PQfreemem(pubnames_literal);
 		pfree(pubnames_str);
+		if (options->proto.logical.binary)
+			appendStringInfo(&cmd, ", binary 'true'");
 
 		appendStringInfoChar(&cmd, ')');
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9a70649134..10a776325a 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1800,6 +1800,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.binary = MySubscription->binary;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 3a50ba0dfe..1a3fcbf075 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
  */
 
 /*							yyyymmddN */
-#define CATALOG_VERSION_NO	201911241
+#define CATALOG_VERSION_NO	201912021
 
 #endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 3cb13d897e..bb44e5e45c 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subbinary;		/* True if the subscription wants the
+								 * output plugin to send data in binary */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +76,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		binary;			/* Indicates if the subscription wants data in binary format */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 41714eaf0c..49ec831bc4 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -162,6 +162,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			bool		binary;				/* Ask publisher output plugin to use binary */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
2.20.1 (Apple Git-117)

