Hi everyone,

I have made a patch that introduces support for libpq binary protocol
in postgres_fdw. The idea is simple, when a user knows that the foreign
server is binary compatible with the local and his workload could
somehow benefit from using binary protocol, it can be switched on for a
particular server or even a particular table. 

The patch adds a new foreign server and table option 'binary_format'
(by default off) and implements serialization/deserialization of query
results and parameters for binary protocol. I have tested the patch by
switching foreign servers in postgres_fdw.sql tests to binary_mode, the
only diff was in the text of the error for parsing an invalid integer
value, so it worked as expected for the test. There are a few minor
issues I don't like in the code and I am yet to write the tests and
docs for it. It would be great to get some feedback and understand,
whether this is a welcome feature, before proceeding with all of the
abovementioned.

Thanks,
Ilya Gladyshev
From 2cb72df03ed94d55cf51531a2d21a7d3369ae27b Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <ilya.v.gladys...@gmail.com>
Date: Sat, 19 Nov 2022 17:47:49 +0400
Subject: [PATCH] postgres_fdw libpq binary proto support

---
 contrib/postgres_fdw/option.c       |   6 +-
 contrib/postgres_fdw/postgres_fdw.c | 389 ++++++++++++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h |   1 +
 3 files changed, 338 insertions(+), 58 deletions(-)

diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index fa80ee2a55..f96cb79b42 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -125,7 +125,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
 			strcmp(def->defname, "truncatable") == 0 ||
 			strcmp(def->defname, "async_capable") == 0 ||
 			strcmp(def->defname, "parallel_commit") == 0 ||
-			strcmp(def->defname, "keep_connections") == 0)
+			strcmp(def->defname, "keep_connections") == 0 ||
+			strcmp(def->defname, "binary_format") == 0)
 		{
 			/* these accept only boolean values */
 			(void) defGetBoolean(def);
@@ -253,6 +254,9 @@ InitPgFdwOptions(void)
 		/* async_capable is available on both server and table */
 		{"async_capable", ForeignServerRelationId, false},
 		{"async_capable", ForeignTableRelationId, false},
+		/* async_capable is available on both server and table */
+		{"binary_format", ForeignServerRelationId, false},
+		{"binary_format", ForeignTableRelationId, false},
 		{"parallel_commit", ForeignServerRelationId, false},
 		{"keep_connections", ForeignServerRelationId, false},
 		{"password_required", UserMappingRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8d7500abfb..9344b6f5fc 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -76,6 +76,8 @@ enum FdwScanPrivateIndex
 	FdwScanPrivateRetrievedAttrs,
 	/* Integer representing the desired fetch_size */
 	FdwScanPrivateFetchSize,
+	/* Boolean flag showing whether to use binary or text libpq protocol */
+	FdwScanPrivateBinaryFormat,
 
 	/*
 	 * String describing join i.e. names of relations being joined and types
@@ -128,7 +130,8 @@ enum FdwDirectModifyPrivateIndex
 	/* Integer list of attribute numbers retrieved by RETURNING */
 	FdwDirectModifyPrivateRetrievedAttrs,
 	/* set-processed flag (as a Boolean node) */
-	FdwDirectModifyPrivateSetProcessed
+	FdwDirectModifyPrivateSetProcessed,
+	FdwDirectModifyPrivateBinaryFormat
 };
 
 /*
@@ -154,6 +157,7 @@ typedef struct PgFdwScanState
 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
 	List	   *param_exprs;	/* executable expressions for param values */
 	const char **param_values;	/* textual values of query parameters */
+	int *param_lengths;
 
 	/* for storing result tuples */
 	HeapTuple  *tuples;			/* array of currently-retrieved tuples */
@@ -172,6 +176,7 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	int			fetch_size;		/* number of tuples per fetch */
+	bool binary_format; /* whether to use libpq binary or text format */
 } PgFdwScanState;
 
 /*
@@ -195,6 +200,7 @@ typedef struct PgFdwModifyState
 	int			batch_size;		/* value of FDW option "batch_size" */
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
+	bool binary_format;
 
 	/* info about parameters for prepared statement */
 	AttrNumber	ctidAttno;		/* attnum of input resjunk ctid column */
@@ -225,7 +231,7 @@ typedef struct PgFdwDirectModifyState
 	bool		has_returning;	/* is there a RETURNING clause? */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by RETURNING */
 	bool		set_processed;	/* do we set the command es_processed? */
-
+	bool binary_format;
 	/* for remote query execution */
 	PGconn	   *conn;			/* connection for the update */
 	PgFdwConnState *conn_state; /* extra per-connection state */
@@ -233,6 +239,7 @@ typedef struct PgFdwDirectModifyState
 	FmgrInfo   *param_flinfo;	/* output conversion functions for them */
 	List	   *param_exprs;	/* executable expressions for param values */
 	const char **param_values;	/* textual values of query parameters */
+	int *param_lengths;
 
 	/* for storing result tuples */
 	PGresult   *result;			/* result for query */
@@ -256,6 +263,7 @@ typedef struct PgFdwAnalyzeState
 	Relation	rel;			/* relcache entry for the foreign table */
 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
 	List	   *retrieved_attrs;	/* attr numbers retrieved by query */
+	bool binary_format;
 
 	/* collected sample rows */
 	HeapTuple  *rows;			/* array of size targrows */
@@ -470,7 +478,8 @@ static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
 											 ItemPointer tupleid,
 											 TupleTableSlot **slots,
-											 int numSlots);
+											 int numSlots,
+											 int **p_lengths);
 static void store_returning_result(PgFdwModifyState *fmstate,
 								   TupleTableSlot *slot, PGresult *res);
 static void finish_foreign_modify(PgFdwModifyState *fmstate);
@@ -492,11 +501,15 @@ static void prepare_query_params(PlanState *node,
 								 int numParams,
 								 FmgrInfo **param_flinfo,
 								 List **param_exprs,
-								 const char ***param_values);
+								 const char ***param_values,
+								 int **param_lengths,
+								 bool binary_format);
 static void process_query_params(ExprContext *econtext,
 								 FmgrInfo *param_flinfo,
 								 List *param_exprs,
-								 const char **param_values);
+								 const char **param_values,
+								 int *param_lengths,
+								 bool binary_format);
 static int	postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 										  HeapTuple *rows, int targrows,
 										  double *totalrows,
@@ -512,7 +525,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
 											AttInMetadata *attinmeta,
 											List *retrieved_attrs,
 											ForeignScanState *fsstate,
-											MemoryContext temp_context);
+											MemoryContext temp_context,
+											bool binary_format);
 static void conversion_error_callback(void *arg);
 static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
 							JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
@@ -540,8 +554,14 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo);
 static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_o,
 							  const PgFdwRelationInfo *fpinfo_i);
-static int	get_batch_size_option(Relation rel);
+static bool get_binary_format_option(ForeignServer *server, ForeignTable *table);
+static int	get_batch_size_option(ForeignServer *server, ForeignTable *table);
+static ForeignScan *find_modifytable_subplan(PlannerInfo *root,
+											 ModifyTable *plan,
+											 Index rtindex,
+											 int subplan_index);
 
+static AttInMetadata *TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc);
 
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
@@ -1404,9 +1424,10 @@ postgresGetForeignPlan(PlannerInfo *root,
 	 * Build the fdw_private list that will be available to the executor.
 	 * Items in the list must match order in enum FdwScanPrivateIndex.
 	 */
-	fdw_private = list_make3(makeString(sql.data),
+	fdw_private = list_make4(makeString(sql.data),
 							 retrieved_attrs,
-							 makeInteger(fpinfo->fetch_size));
+							 makeInteger(fpinfo->fetch_size),
+							 makeBoolean(fpinfo->binary_format));
 	if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
 		fdw_private = lappend(fdw_private,
 							  makeString(fpinfo->relation_name));
@@ -1542,6 +1563,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 												 FdwScanPrivateRetrievedAttrs);
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
 										  FdwScanPrivateFetchSize));
+	fsstate->binary_format = boolVal(list_nth(fsplan->fdw_private,
+											  FdwScanPrivateBinaryFormat));
 
 	/* Create contexts for batches of tuples and per-tuple temp workspace. */
 	fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1566,8 +1589,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 		fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node);
 	}
 
-	fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
-
+	if (fsstate->binary_format)
+		fsstate->attinmeta = TupleDescGetAttInBinaryMetadata(fsstate->tupdesc);
+	else
+		fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
 	/*
 	 * Prepare for processing of parameters used in remote query, if any.
 	 */
@@ -1579,7 +1604,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 							 numParams,
 							 &fsstate->param_flinfo,
 							 &fsstate->param_exprs,
-							 &fsstate->param_values);
+							 &fsstate->param_values,
+							 &fsstate->param_lengths,
+							 fsstate->binary_format);
 
 	/* Set the async-capable flag */
 	fsstate->async_capable = node->ss.ps.async_capable;
@@ -2038,7 +2065,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
 	if (fmstate)
 		batch_size = fmstate->batch_size;
 	else
-		batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
+	{
+		ForeignServer *fs;
+		ForeignTable *ft;
+
+		ft = GetForeignTable(RelationGetRelid(resultRelInfo->ri_RelationDesc));
+		fs = GetForeignServer(ft->serverid);
+
+		batch_size = get_batch_size_option(fs, ft);
+	}
 
 	/*
 	 * Disable batching when we have to use RETURNING, there are any
@@ -2593,10 +2628,11 @@ postgresPlanDirectModify(PlannerInfo *root,
 	 * Update the fdw_private list that will be available to the executor.
 	 * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
 	 */
-	fscan->fdw_private = list_make4(makeString(sql.data),
+	fscan->fdw_private = list_make5(makeString(sql.data),
 									makeBoolean((retrieved_attrs != NIL)),
 									retrieved_attrs,
-									makeBoolean(plan->canSetTag));
+									makeBoolean(plan->canSetTag),
+									makeBoolean(fpinfo->binary_format));
 
 	/*
 	 * Update the foreign-join-related fields.
@@ -2701,6 +2737,8 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 	dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private,
 											  FdwDirectModifyPrivateSetProcessed));
 
+	dmstate->binary_format = boolVal(list_nth(fsplan->fdw_private,
+											  FdwDirectModifyPrivateBinaryFormat));
 	/* Create context for per-tuple temp workspace. */
 	dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
 											  "postgres_fdw temporary data",
@@ -2716,7 +2754,10 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 		else
 			tupdesc = RelationGetDescr(dmstate->rel);
 
-		dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+		if (dmstate->binary_format)
+			dmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc);
+		else
+			dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
 		/*
 		 * When performing an UPDATE/DELETE .. RETURNING on a join directly,
@@ -2738,7 +2779,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
 							 numParams,
 							 &dmstate->param_flinfo,
 							 &dmstate->param_exprs,
-							 &dmstate->param_values);
+							 &dmstate->param_values,
+							 &dmstate->param_lengths,
+							 dmstate->binary_format);
 }
 
 /*
@@ -3711,9 +3754,11 @@ create_cursor(ForeignScanState *node)
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = fsstate->numParams;
 	const char **values = fsstate->param_values;
+	int *formats = NULL;
 	PGconn	   *conn = fsstate->conn;
 	StringInfoData buf;
 	PGresult   *res;
+	bool binary = fsstate->binary_format;
 
 	/* First, process a pending asynchronous request, if any. */
 	if (fsstate->conn_state->pendingAreq)
@@ -3733,15 +3778,28 @@ create_cursor(ForeignScanState *node)
 		process_query_params(econtext,
 							 fsstate->param_flinfo,
 							 fsstate->param_exprs,
-							 values);
+							 values,
+							 fsstate->param_lengths,
+							 binary);
 
 		MemoryContextSwitchTo(oldcontext);
+
+		if (binary)
+		{
+			int i;
+
+			formats = palloc(sizeof(int) * numParams);
+			for (i = 0; i < numParams; i++)
+				formats[i] = 1;
+		}
 	}
 
 	/* Construct the DECLARE CURSOR command */
 	initStringInfo(&buf);
-	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
-					 fsstate->cursor_number, fsstate->query);
+	appendStringInfo(&buf, "DECLARE c%u %sCURSOR FOR\n%s",
+					 fsstate->cursor_number,
+					 binary ? "BINARY " : "",
+					 fsstate->query);
 
 	/*
 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
@@ -3751,7 +3809,7 @@ create_cursor(ForeignScanState *node)
 	 * server has the same OIDs we do for the parameters' types.
 	 */
 	if (!PQsendQueryParams(conn, buf.data, numParams,
-						   NULL, values, NULL, NULL, 0))
+						   NULL, values, fsstate->param_lengths, formats, binary ? 1 : 0))
 		pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
 
 	/*
@@ -3848,7 +3906,8 @@ fetch_more_data(ForeignScanState *node)
 										   fsstate->attinmeta,
 										   fsstate->retrieved_attrs,
 										   node,
-										   fsstate->temp_cxt);
+										   fsstate->temp_cxt,
+										   fsstate->binary_format);
 		}
 
 		/* Update fetch_ct_2 */
@@ -3969,11 +4028,13 @@ create_foreign_modify(EState *estate,
 	TupleDesc	tupdesc = RelationGetDescr(rel);
 	Oid			userid;
 	ForeignTable *table;
+	ForeignServer *server;
 	UserMapping *user;
 	AttrNumber	n_params;
 	Oid			typefnoid;
 	bool		isvarlena;
 	ListCell   *lc;
+	bool binary_format;
 
 	/* Begin constructing PgFdwModifyState. */
 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
@@ -3987,6 +4048,7 @@ create_foreign_modify(EState *estate,
 
 	/* Get info about foreign table. */
 	table = GetForeignTable(RelationGetRelid(rel));
+	server = GetForeignServer(table->serverid);
 	user = GetUserMapping(userid, table->serverid);
 
 	/* Open connection; report that we'll create a prepared statement. */
@@ -4000,6 +4062,9 @@ create_foreign_modify(EState *estate,
 		fmstate->query = pstrdup(fmstate->query);
 		fmstate->orig_query = pstrdup(fmstate->query);
 	}
+	binary_format = get_binary_format_option(server, table);
+
+	fmstate->binary_format = binary_format;
 	fmstate->target_attrs = target_attrs;
 	fmstate->values_end = values_end;
 	fmstate->has_returning = has_returning;
@@ -4012,7 +4077,12 @@ create_foreign_modify(EState *estate,
 
 	/* Prepare for input conversion of RETURNING results. */
 	if (fmstate->has_returning)
-		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	{
+		if (binary_format)
+			fmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc);
+		else
+			fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	}
 
 	/* Prepare for output conversion of parameters used in prepared stmt. */
 	n_params = list_length(fmstate->target_attrs) + 1;
@@ -4030,7 +4100,10 @@ create_foreign_modify(EState *estate,
 			elog(ERROR, "could not find junk ctid column");
 
 		/* First transmittable parameter will be ctid */
-		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+		if (binary_format)
+			getTypeBinaryOutputInfo(TIDOID, &typefnoid, &isvarlena);
+		else
+			getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
 		fmstate->p_nums++;
 	}
@@ -4048,7 +4121,11 @@ create_foreign_modify(EState *estate,
 			/* Ignore generated columns; they are set to DEFAULT */
 			if (attr->attgenerated)
 				continue;
-			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+
+			if (binary_format)
+				getTypeBinaryOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+			else
+				getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
 			fmstate->p_nums++;
 		}
@@ -4058,7 +4135,7 @@ create_foreign_modify(EState *estate,
 
 	/* Set batch_size from foreign server/table options. */
 	if (operation == CMD_INSERT)
-		fmstate->batch_size = get_batch_size_option(rel);
+		fmstate->batch_size = get_batch_size_option(server, table);
 
 	fmstate->num_slots = 1;
 
@@ -4088,6 +4165,9 @@ execute_foreign_modify(EState *estate,
 	const char **p_values;
 	PGresult   *res;
 	int			n_rows;
+	int *p_formats = NULL;
+	int numParams = fmstate->p_nums * (*numSlots);
+	int *p_lengths = NULL;
 	StringInfoData sql;
 
 	/* The operation should be INSERT, UPDATE, or DELETE */
@@ -4142,7 +4222,17 @@ execute_foreign_modify(EState *estate,
 	}
 
 	/* Convert parameters needed by prepared statement to text form */
-	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
+	p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots,
+										&p_lengths);
+
+	if (fmstate->binary_format)
+	{
+		int i;
+
+		p_formats = palloc(sizeof(int) * numParams);
+		for (i = 0; i < numParams; i++)
+			p_formats[i] = 1;
+	}
 
 	/*
 	 * Execute the prepared statement.
@@ -4151,9 +4241,9 @@ execute_foreign_modify(EState *estate,
 							 fmstate->p_name,
 							 fmstate->p_nums * (*numSlots),
 							 p_values,
-							 NULL,
-							 NULL,
-							 0))
+							 p_lengths,
+							 p_formats,
+							 fmstate->binary_format ? 1 : 0))
 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
 
 	/*
@@ -4254,17 +4344,24 @@ static const char **
 convert_prep_stmt_params(PgFdwModifyState *fmstate,
 						 ItemPointer tupleid,
 						 TupleTableSlot **slots,
-						 int numSlots)
+						 int numSlots,
+						 int **param_lengths)
 {
 	const char **p_values;
 	int			i;
 	int			j;
 	int			pindex = 0;
 	MemoryContext oldcontext;
+	int *p_lengths = NULL;
+
 
 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
 
 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+	if (fmstate->binary_format)
+		p_lengths = palloc(sizeof(int) * fmstate->p_nums * numSlots);
+
+	*param_lengths = p_lengths;
 
 	/* ctid is provided only for UPDATE/DELETE, which don't allow batching */
 	Assert(!(tupleid != NULL && numSlots > 1));
@@ -4274,8 +4371,18 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 	{
 		Assert(numSlots == 1);
 		/* don't need set_transmission_modes for TID output */
-		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
-											  PointerGetDatum(tupleid));
+		if (fmstate->binary_format)
+		{
+			bytea *val;
+
+			val = SendFunctionCall(&fmstate->p_flinfo[pindex],
+								   PointerGetDatum(tupleid));
+			p_values[pindex] = VARDATA(val);
+			p_lengths[pindex] = VARSIZE(val) - VARHDRSZ;
+		}
+		else
+			p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+												  PointerGetDatum(tupleid));
 		pindex++;
 	}
 
@@ -4303,7 +4410,21 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
 					continue;
 				value = slot_getattr(slots[i], attnum, &isnull);
 				if (isnull)
+				{
 					p_values[pindex] = NULL;
+
+					/* Binary params are parsed as NULL if length is -1 */
+					if (fmstate->binary_format)
+						p_lengths[pindex] = -1;
+				}
+				else if (fmstate->binary_format)
+				{
+					bytea *val;
+
+					val = SendFunctionCall(&fmstate->p_flinfo[j], value);
+					p_values[pindex] = VARDATA(val);
+					p_lengths[pindex] = VARSIZE(val) - VARHDRSZ;
+				}
 				else
 					p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
 														  value);
@@ -4342,7 +4463,8 @@ store_returning_result(PgFdwModifyState *fmstate,
 											fmstate->attinmeta,
 											fmstate->retrieved_attrs,
 											NULL,
-											fmstate->temp_cxt);
+											fmstate->temp_cxt,
+											fmstate->binary_format);
 
 		/*
 		 * The returning slot will not necessarily be suitable to store
@@ -4543,6 +4665,8 @@ execute_dml_stmt(ForeignScanState *node)
 	ExprContext *econtext = node->ss.ps.ps_ExprContext;
 	int			numParams = dmstate->numParams;
 	const char **values = dmstate->param_values;
+	int *formats = NULL;
+	bool binary = dmstate->binary_format;
 
 	/* First, process a pending asynchronous request, if any. */
 	if (dmstate->conn_state->pendingAreq)
@@ -4552,11 +4676,24 @@ execute_dml_stmt(ForeignScanState *node)
 	 * Construct array of query parameter values in text format.
 	 */
 	if (numParams > 0)
+	{
 		process_query_params(econtext,
 							 dmstate->param_flinfo,
 							 dmstate->param_exprs,
-							 values);
+							 values,
+							 dmstate->param_lengths,
+							 dmstate->binary_format);
+
+		if (binary)
+		{
+			int i;
 
+			formats = palloc(sizeof(int) * numParams);
+			for (i = 0; i < numParams; i++)
+				formats[i] = 1;
+		}
+
+	}
 	/*
 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
 	 * to infer types for all parameters.  Since we explicitly cast every
@@ -4565,7 +4702,7 @@ execute_dml_stmt(ForeignScanState *node)
 	 * server has the same OIDs we do for the parameters' types.
 	 */
 	if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
-						   NULL, values, NULL, NULL, 0))
+						   NULL, values, dmstate->param_lengths, formats, binary ? 1 : 0))
 		pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
 
 	/*
@@ -4635,7 +4772,8 @@ get_returning_data(ForeignScanState *node)
 												dmstate->attinmeta,
 												dmstate->retrieved_attrs,
 												node,
-												dmstate->temp_cxt);
+												dmstate->temp_cxt,
+												dmstate->binary_format);
 			ExecStoreHeapTuple(newtup, slot, false);
 		}
 		PG_CATCH();
@@ -4841,7 +4979,9 @@ prepare_query_params(PlanState *node,
 					 int numParams,
 					 FmgrInfo **param_flinfo,
 					 List **param_exprs,
-					 const char ***param_values)
+					 const char ***param_values,
+					 int **param_lengths,
+					 bool binary_format)
 {
 	int			i;
 	ListCell   *lc;
@@ -4857,8 +4997,14 @@ prepare_query_params(PlanState *node,
 		Node	   *param_expr = (Node *) lfirst(lc);
 		Oid			typefnoid;
 		bool		isvarlena;
-
-		getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+		if (binary_format)
+		{
+			getTypeBinaryOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+		}
+		else
+		{
+			getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+		}
 		fmgr_info(typefnoid, &(*param_flinfo)[i]);
 		i++;
 	}
@@ -4875,6 +5021,8 @@ prepare_query_params(PlanState *node,
 
 	/* Allocate buffer for text form of query parameters. */
 	*param_values = (const char **) palloc0(numParams * sizeof(char *));
+	if (binary_format)
+		*param_lengths = (int *) palloc0(numParams * sizeof(int));
 }
 
 /*
@@ -4884,7 +5032,9 @@ static void
 process_query_params(ExprContext *econtext,
 					 FmgrInfo *param_flinfo,
 					 List *param_exprs,
-					 const char **param_values)
+					 const char **param_values,
+					 int *param_lengths,
+					 bool binary_format)
 {
 	int			nestlevel;
 	int			i;
@@ -4907,7 +5057,19 @@ process_query_params(ExprContext *econtext,
 		 * type-specific output function, unless the value is null.
 		 */
 		if (isNull)
+		{
 			param_values[i] = NULL;
+
+			/* Binary params are parsed as NULL if length is -1 */
+			if (binary_format)
+				param_lengths[i] = -1;
+		}
+		else if (binary_format)
+		{
+			bytea *val = SendFunctionCall(&param_flinfo[i], expr_value);
+			param_values[i] = VARDATA(val);
+			param_lengths[i] = VARSIZE(val) - VARHDRSZ;
+		}
 		else
 			param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
 
@@ -5011,7 +5173,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 
 	/* Initialize workspace state */
 	astate.rel = relation;
-	astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
 
 	astate.rows = rows;
 	astate.targrows = targrows;
@@ -5035,12 +5196,17 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
 	conn = GetConnection(user, false, NULL);
 
+	astate.binary_format = get_binary_format_option(server, table);
+	if (astate.binary_format)
+		astate.attinmeta = TupleDescGetAttInBinaryMetadata(RelationGetDescr(relation));
+	else
+		astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));;
 	/*
 	 * Construct cursor that retrieves whole rows from remote.
 	 */
 	cursor_number = GetCursorNumber(conn);
 	initStringInfo(&sql);
-	appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
+	appendStringInfo(&sql, "DECLARE c%u %sCURSOR FOR ", cursor_number, astate.binary_format ? "BINARY " : "");
 	deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs);
 
 	/* In what follows, do not risk leaking any PGresults. */
@@ -5212,7 +5378,8 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
 													   astate->attinmeta,
 													   astate->retrieved_attrs,
 													   NULL,
-													   astate->temp_cxt);
+													   astate->temp_cxt,
+													   astate->binary_format);
 
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -5922,6 +6089,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
 			(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
 		else if (strcmp(def->defname, "async_capable") == 0)
 			fpinfo->async_capable = defGetBoolean(def);
+		else if (strcmp(def->defname, "binary_format") == 0)
+			fpinfo->binary_format = defGetBoolean(def);
 	}
 }
 
@@ -5945,6 +6114,9 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
 			(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
 		else if (strcmp(def->defname, "async_capable") == 0)
 			fpinfo->async_capable = defGetBoolean(def);
+		else if (strcmp(def->defname, "binary_format") == 0)
+			fpinfo->binary_format = defGetBoolean(def);
+
 	}
 }
 
@@ -5980,6 +6152,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 	fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
 	fpinfo->fetch_size = fpinfo_o->fetch_size;
 	fpinfo->async_capable = fpinfo_o->async_capable;
+	fpinfo->binary_format = fpinfo_o->binary_format;
 
 	/* Merge the table level options from either side of the join. */
 	if (fpinfo_i)
@@ -6011,6 +6184,9 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
 		 */
 		fpinfo->async_capable = fpinfo_o->async_capable ||
 			fpinfo_i->async_capable;
+
+		/* Join will use binary proto only if both base rels are configured to use it. */
+		fpinfo->binary_format = fpinfo_o->binary_format && fpinfo->binary_format;
 	}
 }
 
@@ -7209,7 +7385,8 @@ make_tuple_from_result_row(PGresult *res,
 						   AttInMetadata *attinmeta,
 						   List *retrieved_attrs,
 						   ForeignScanState *fsstate,
-						   MemoryContext temp_context)
+						   MemoryContext temp_context,
+						   bool binary_format)
 {
 	HeapTuple	tuple;
 	TupleDesc	tupdesc;
@@ -7221,6 +7398,10 @@ make_tuple_from_result_row(PGresult *res,
 	MemoryContext oldcontext;
 	ListCell   *lc;
 	int			j;
+	StringInfo buf = NULL;
+
+	if (binary_format)
+		buf = makeStringInfo();
 
 	Assert(row < PQntuples(res));
 
@@ -7274,6 +7455,11 @@ make_tuple_from_result_row(PGresult *res,
 		else
 			valstr = PQgetvalue(res, row, j);
 
+		if (binary_format && valstr != NULL)
+		{
+			resetStringInfo(buf);
+			appendBinaryStringInfo(buf, valstr, PQgetlength(res, row, j));
+		}
 		/*
 		 * convert value to internal representation
 		 *
@@ -7286,10 +7472,20 @@ make_tuple_from_result_row(PGresult *res,
 			Assert(i <= tupdesc->natts);
 			nulls[i - 1] = (valstr == NULL);
 			/* Apply the input function even to nulls, to support domains */
-			values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
-											  valstr,
-											  attinmeta->attioparams[i - 1],
-											  attinmeta->atttypmods[i - 1]);
+			if (binary_format)
+			{
+				values[i - 1] = ReceiveFunctionCall(&attinmeta->attinfuncs[i - 1],
+													nulls[i - 1] ? NULL : buf,
+													attinmeta->attioparams[i - 1],
+													attinmeta->atttypmods[i - 1]);
+			}
+			else
+			{
+				values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1],
+												  valstr,
+												  attinmeta->attioparams[i - 1],
+												  attinmeta->atttypmods[i - 1]);
+			}
 		}
 		else if (i == SelfItemPointerAttributeNumber)
 		{
@@ -7298,7 +7494,10 @@ make_tuple_from_result_row(PGresult *res,
 			{
 				Datum		datum;
 
-				datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
+				if (binary_format)
+					datum = DirectFunctionCall1(tidrecv, PointerGetDatum(buf));
+				else
+					datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
 				ctid = (ItemPointer) DatumGetPointer(datum);
 			}
 		}
@@ -7556,16 +7755,46 @@ find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec,
 	return NULL;
 }
 
+
+static bool
+get_binary_format_option(ForeignServer *server, ForeignTable *table)
+{
+	List	   *options;
+	ListCell   *lc;
+
+	/* By default, text protocol is used */
+	bool binary_format = false;
+
+	/*
+	 * Load options for table and server. We append server options after table
+	 * options, because table options take precedence.
+	 */
+	options = NIL;
+	options = list_concat(options, table->options);
+	options = list_concat(options, server->options);
+
+	/* See if either table or server specifies batch_size. */
+	foreach(lc, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "binary_format") == 0)
+		{
+			(void) parse_bool(defGetString(def), &binary_format);
+			break;
+		}
+	}
+	list_free(options);
+	return binary_format;
+}
+
 /*
  * Determine batch size for a given foreign table. The option specified for
  * a table has precedence.
  */
 static int
-get_batch_size_option(Relation rel)
+get_batch_size_option(ForeignServer *server, ForeignTable *table)
 {
-	Oid			foreigntableid = RelationGetRelid(rel);
-	ForeignTable *table;
-	ForeignServer *server;
 	List	   *options;
 	ListCell   *lc;
 
@@ -7576,9 +7805,6 @@ get_batch_size_option(Relation rel)
 	 * Load options for table and server. We append server options after table
 	 * options, because table options take precedence.
 	 */
-	table = GetForeignTable(foreigntableid);
-	server = GetForeignServer(table->serverid);
-
 	options = NIL;
 	options = list_concat(options, table->options);
 	options = list_concat(options, server->options);
@@ -7595,5 +7821,54 @@ get_batch_size_option(Relation rel)
 		}
 	}
 
+	list_free(options);
 	return batch_size;
 }
+
+/*
+ * pTupleDescGetAttInBinaryMetadata - Basically a copy of TupleDescGetAttInMetadata
+ * where input function is replaced binary input function.
+ */
+static AttInMetadata *
+TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc)
+{
+	int			natts = tupdesc->natts;
+	int			i;
+	Oid			atttypeid;
+	Oid			attinfuncid;
+	FmgrInfo   *attinfuncinfo;
+	Oid		   *attioparams;
+	int32	   *atttypmods;
+	AttInMetadata *attinmeta;
+
+	attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata));
+
+	/* "Bless" the tupledesc so that we can make rowtype datums with it */
+	attinmeta->tupdesc = BlessTupleDesc(tupdesc);
+
+	/*
+	 * Gather info needed later to call the "in" function for each attribute
+	 */
+	attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo));
+	attioparams = (Oid *) palloc0(natts * sizeof(Oid));
+	atttypmods = (int32 *) palloc0(natts * sizeof(int32));
+
+	for (i = 0; i < natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupdesc, i);
+
+		/* Ignore dropped attributes */
+		if (!att->attisdropped)
+		{
+			atttypeid = att->atttypid;
+			getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]);
+			fmgr_info(attinfuncid, &attinfuncinfo[i]);
+			atttypmods[i] = att->atttypmod;
+		}
+	}
+	attinmeta->attinfuncs = attinfuncinfo;
+	attinmeta->attioparams = attioparams;
+	attinmeta->atttypmods = atttypmods;
+
+	return attinmeta;
+}
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a11d45bedf..993d3f026a 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -80,6 +80,7 @@ typedef struct PgFdwRelationInfo
 	Cost		fdw_tuple_cost;
 	List	   *shippable_extensions;	/* OIDs of shippable extensions */
 	bool		async_capable;
+	bool binary_format;
 
 	/* Cached catalog information. */
 	ForeignTable *table;
-- 
2.30.2

Reply via email to