Hi everyone, I have made a patch that introduces support for libpq binary protocol in postgres_fdw. The idea is simple, when a user knows that the foreign server is binary compatible with the local and his workload could somehow benefit from using binary protocol, it can be switched on for a particular server or even a particular table.
The patch adds a new foreign server and table option 'binary_format' (by default off) and implements serialization/deserialization of query results and parameters for binary protocol. I have tested the patch by switching foreign servers in postgres_fdw.sql tests to binary_mode, the only diff was in the text of the error for parsing an invalid integer value, so it worked as expected for the test. There are a few minor issues I don't like in the code and I am yet to write the tests and docs for it. It would be great to get some feedback and understand, whether this is a welcome feature, before proceeding with all of the abovementioned. Thanks, Ilya Gladyshev
From 2cb72df03ed94d55cf51531a2d21a7d3369ae27b Mon Sep 17 00:00:00 2001 From: Ilya Gladyshev <ilya.v.gladys...@gmail.com> Date: Sat, 19 Nov 2022 17:47:49 +0400 Subject: [PATCH] postgres_fdw libpq binary proto support --- contrib/postgres_fdw/option.c | 6 +- contrib/postgres_fdw/postgres_fdw.c | 389 ++++++++++++++++++++++++---- contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 338 insertions(+), 58 deletions(-) diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index fa80ee2a55..f96cb79b42 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -125,7 +125,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) strcmp(def->defname, "truncatable") == 0 || strcmp(def->defname, "async_capable") == 0 || strcmp(def->defname, "parallel_commit") == 0 || - strcmp(def->defname, "keep_connections") == 0) + strcmp(def->defname, "keep_connections") == 0 || + strcmp(def->defname, "binary_format") == 0) { /* these accept only boolean values */ (void) defGetBoolean(def); @@ -253,6 +254,9 @@ InitPgFdwOptions(void) /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, + /* async_capable is available on both server and table */ + {"binary_format", ForeignServerRelationId, false}, + {"binary_format", ForeignTableRelationId, false}, {"parallel_commit", ForeignServerRelationId, false}, {"keep_connections", ForeignServerRelationId, false}, {"password_required", UserMappingRelationId, false}, diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 8d7500abfb..9344b6f5fc 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -76,6 +76,8 @@ enum FdwScanPrivateIndex FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ FdwScanPrivateFetchSize, + /* Boolean flag showing whether to use binary or text libpq protocol */ + FdwScanPrivateBinaryFormat, /* * String describing join i.e. names of relations being joined and types @@ -128,7 +130,8 @@ enum FdwDirectModifyPrivateIndex /* Integer list of attribute numbers retrieved by RETURNING */ FdwDirectModifyPrivateRetrievedAttrs, /* set-processed flag (as a Boolean node) */ - FdwDirectModifyPrivateSetProcessed + FdwDirectModifyPrivateSetProcessed, + FdwDirectModifyPrivateBinaryFormat }; /* @@ -154,6 +157,7 @@ typedef struct PgFdwScanState FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ + int *param_lengths; /* for storing result tuples */ HeapTuple *tuples; /* array of currently-retrieved tuples */ @@ -172,6 +176,7 @@ typedef struct PgFdwScanState MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ + bool binary_format; /* whether to use libpq binary or text format */ } PgFdwScanState; /* @@ -195,6 +200,7 @@ typedef struct PgFdwModifyState int batch_size; /* value of FDW option "batch_size" */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool binary_format; /* info about parameters for prepared statement */ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ @@ -225,7 +231,7 @@ typedef struct PgFdwDirectModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ bool set_processed; /* do we set the command es_processed? */ - + bool binary_format; /* for remote query execution */ PGconn *conn; /* connection for the update */ PgFdwConnState *conn_state; /* extra per-connection state */ @@ -233,6 +239,7 @@ typedef struct PgFdwDirectModifyState FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ const char **param_values; /* textual values of query parameters */ + int *param_lengths; /* for storing result tuples */ PGresult *result; /* result for query */ @@ -256,6 +263,7 @@ typedef struct PgFdwAnalyzeState Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ List *retrieved_attrs; /* attr numbers retrieved by query */ + bool binary_format; /* collected sample rows */ HeapTuple *rows; /* array of size targrows */ @@ -470,7 +478,8 @@ static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, - int numSlots); + int numSlots, + int **p_lengths); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static void finish_foreign_modify(PgFdwModifyState *fmstate); @@ -492,11 +501,15 @@ static void prepare_query_params(PlanState *node, int numParams, FmgrInfo **param_flinfo, List **param_exprs, - const char ***param_values); + const char ***param_values, + int **param_lengths, + bool binary_format); static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, - const char **param_values); + const char **param_values, + int *param_lengths, + bool binary_format); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, @@ -512,7 +525,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, - MemoryContext temp_context); + MemoryContext temp_context, + bool binary_format); static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, @@ -540,8 +554,14 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); -static int get_batch_size_option(Relation rel); +static bool get_binary_format_option(ForeignServer *server, ForeignTable *table); +static int get_batch_size_option(ForeignServer *server, ForeignTable *table); +static ForeignScan *find_modifytable_subplan(PlannerInfo *root, + ModifyTable *plan, + Index rtindex, + int subplan_index); +static AttInMetadata *TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc); /* * Foreign-data wrapper handler function: return a struct with pointers @@ -1404,9 +1424,10 @@ postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match order in enum FdwScanPrivateIndex. */ - fdw_private = list_make3(makeString(sql.data), + fdw_private = list_make4(makeString(sql.data), retrieved_attrs, - makeInteger(fpinfo->fetch_size)); + makeInteger(fpinfo->fetch_size), + makeBoolean(fpinfo->binary_format)); if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name)); @@ -1542,6 +1563,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateFetchSize)); + fsstate->binary_format = boolVal(list_nth(fsplan->fdw_private, + FdwScanPrivateBinaryFormat)); /* Create contexts for batches of tuples and per-tuple temp workspace. */ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, @@ -1566,8 +1589,10 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); } - fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); - + if (fsstate->binary_format) + fsstate->attinmeta = TupleDescGetAttInBinaryMetadata(fsstate->tupdesc); + else + fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* * Prepare for processing of parameters used in remote query, if any. */ @@ -1579,7 +1604,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) numParams, &fsstate->param_flinfo, &fsstate->param_exprs, - &fsstate->param_values); + &fsstate->param_values, + &fsstate->param_lengths, + fsstate->binary_format); /* Set the async-capable flag */ fsstate->async_capable = node->ss.ps.async_capable; @@ -2038,7 +2065,15 @@ postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) if (fmstate) batch_size = fmstate->batch_size; else - batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); + { + ForeignServer *fs; + ForeignTable *ft; + + ft = GetForeignTable(RelationGetRelid(resultRelInfo->ri_RelationDesc)); + fs = GetForeignServer(ft->serverid); + + batch_size = get_batch_size_option(fs, ft); + } /* * Disable batching when we have to use RETURNING, there are any @@ -2593,10 +2628,11 @@ postgresPlanDirectModify(PlannerInfo *root, * Update the fdw_private list that will be available to the executor. * Items in the list must match enum FdwDirectModifyPrivateIndex, above. */ - fscan->fdw_private = list_make4(makeString(sql.data), + fscan->fdw_private = list_make5(makeString(sql.data), makeBoolean((retrieved_attrs != NIL)), retrieved_attrs, - makeBoolean(plan->canSetTag)); + makeBoolean(plan->canSetTag), + makeBoolean(fpinfo->binary_format)); /* * Update the foreign-join-related fields. @@ -2701,6 +2737,8 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private, FdwDirectModifyPrivateSetProcessed)); + dmstate->binary_format = boolVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateBinaryFormat)); /* Create context for per-tuple temp workspace. */ dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", @@ -2716,7 +2754,10 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) else tupdesc = RelationGetDescr(dmstate->rel); - dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + if (dmstate->binary_format) + dmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc); + else + dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); /* * When performing an UPDATE/DELETE .. RETURNING on a join directly, @@ -2738,7 +2779,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) numParams, &dmstate->param_flinfo, &dmstate->param_exprs, - &dmstate->param_values); + &dmstate->param_values, + &dmstate->param_lengths, + dmstate->binary_format); } /* @@ -3711,9 +3754,11 @@ create_cursor(ForeignScanState *node) ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = fsstate->numParams; const char **values = fsstate->param_values; + int *formats = NULL; PGconn *conn = fsstate->conn; StringInfoData buf; PGresult *res; + bool binary = fsstate->binary_format; /* First, process a pending asynchronous request, if any. */ if (fsstate->conn_state->pendingAreq) @@ -3733,15 +3778,28 @@ create_cursor(ForeignScanState *node) process_query_params(econtext, fsstate->param_flinfo, fsstate->param_exprs, - values); + values, + fsstate->param_lengths, + binary); MemoryContextSwitchTo(oldcontext); + + if (binary) + { + int i; + + formats = palloc(sizeof(int) * numParams); + for (i = 0; i < numParams; i++) + formats[i] = 1; + } } /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); - appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", - fsstate->cursor_number, fsstate->query); + appendStringInfo(&buf, "DECLARE c%u %sCURSOR FOR\n%s", + fsstate->cursor_number, + binary ? "BINARY " : "", + fsstate->query); /* * Notice that we pass NULL for paramTypes, thus forcing the remote server @@ -3751,7 +3809,7 @@ create_cursor(ForeignScanState *node) * server has the same OIDs we do for the parameters' types. */ if (!PQsendQueryParams(conn, buf.data, numParams, - NULL, values, NULL, NULL, 0)) + NULL, values, fsstate->param_lengths, formats, binary ? 1 : 0)) pgfdw_report_error(ERROR, NULL, conn, false, buf.data); /* @@ -3848,7 +3906,8 @@ fetch_more_data(ForeignScanState *node) fsstate->attinmeta, fsstate->retrieved_attrs, node, - fsstate->temp_cxt); + fsstate->temp_cxt, + fsstate->binary_format); } /* Update fetch_ct_2 */ @@ -3969,11 +4028,13 @@ create_foreign_modify(EState *estate, TupleDesc tupdesc = RelationGetDescr(rel); Oid userid; ForeignTable *table; + ForeignServer *server; UserMapping *user; AttrNumber n_params; Oid typefnoid; bool isvarlena; ListCell *lc; + bool binary_format; /* Begin constructing PgFdwModifyState. */ fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); @@ -3987,6 +4048,7 @@ create_foreign_modify(EState *estate, /* Get info about foreign table. */ table = GetForeignTable(RelationGetRelid(rel)); + server = GetForeignServer(table->serverid); user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ @@ -4000,6 +4062,9 @@ create_foreign_modify(EState *estate, fmstate->query = pstrdup(fmstate->query); fmstate->orig_query = pstrdup(fmstate->query); } + binary_format = get_binary_format_option(server, table); + + fmstate->binary_format = binary_format; fmstate->target_attrs = target_attrs; fmstate->values_end = values_end; fmstate->has_returning = has_returning; @@ -4012,7 +4077,12 @@ create_foreign_modify(EState *estate, /* Prepare for input conversion of RETURNING results. */ if (fmstate->has_returning) - fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + { + if (binary_format) + fmstate->attinmeta = TupleDescGetAttInBinaryMetadata(tupdesc); + else + fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + } /* Prepare for output conversion of parameters used in prepared stmt. */ n_params = list_length(fmstate->target_attrs) + 1; @@ -4030,7 +4100,10 @@ create_foreign_modify(EState *estate, elog(ERROR, "could not find junk ctid column"); /* First transmittable parameter will be ctid */ - getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + if (binary_format) + getTypeBinaryOutputInfo(TIDOID, &typefnoid, &isvarlena); + else + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } @@ -4048,7 +4121,11 @@ create_foreign_modify(EState *estate, /* Ignore generated columns; they are set to DEFAULT */ if (attr->attgenerated) continue; - getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + + if (binary_format) + getTypeBinaryOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + else + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); fmstate->p_nums++; } @@ -4058,7 +4135,7 @@ create_foreign_modify(EState *estate, /* Set batch_size from foreign server/table options. */ if (operation == CMD_INSERT) - fmstate->batch_size = get_batch_size_option(rel); + fmstate->batch_size = get_batch_size_option(server, table); fmstate->num_slots = 1; @@ -4088,6 +4165,9 @@ execute_foreign_modify(EState *estate, const char **p_values; PGresult *res; int n_rows; + int *p_formats = NULL; + int numParams = fmstate->p_nums * (*numSlots); + int *p_lengths = NULL; StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ @@ -4142,7 +4222,17 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots, + &p_lengths); + + if (fmstate->binary_format) + { + int i; + + p_formats = palloc(sizeof(int) * numParams); + for (i = 0; i < numParams; i++) + p_formats[i] = 1; + } /* * Execute the prepared statement. @@ -4151,9 +4241,9 @@ execute_foreign_modify(EState *estate, fmstate->p_name, fmstate->p_nums * (*numSlots), p_values, - NULL, - NULL, - 0)) + p_lengths, + p_formats, + fmstate->binary_format ? 1 : 0)) pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); /* @@ -4254,17 +4344,24 @@ static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot **slots, - int numSlots) + int numSlots, + int **param_lengths) { const char **p_values; int i; int j; int pindex = 0; MemoryContext oldcontext; + int *p_lengths = NULL; + oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); + if (fmstate->binary_format) + p_lengths = palloc(sizeof(int) * fmstate->p_nums * numSlots); + + *param_lengths = p_lengths; /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ Assert(!(tupleid != NULL && numSlots > 1)); @@ -4274,8 +4371,18 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, { Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - PointerGetDatum(tupleid)); + if (fmstate->binary_format) + { + bytea *val; + + val = SendFunctionCall(&fmstate->p_flinfo[pindex], + PointerGetDatum(tupleid)); + p_values[pindex] = VARDATA(val); + p_lengths[pindex] = VARSIZE(val) - VARHDRSZ; + } + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + PointerGetDatum(tupleid)); pindex++; } @@ -4303,7 +4410,21 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate, continue; value = slot_getattr(slots[i], attnum, &isnull); if (isnull) + { p_values[pindex] = NULL; + + /* Binary params are parsed as NULL if length is -1 */ + if (fmstate->binary_format) + p_lengths[pindex] = -1; + } + else if (fmstate->binary_format) + { + bytea *val; + + val = SendFunctionCall(&fmstate->p_flinfo[j], value); + p_values[pindex] = VARDATA(val); + p_lengths[pindex] = VARSIZE(val) - VARHDRSZ; + } else p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], value); @@ -4342,7 +4463,8 @@ store_returning_result(PgFdwModifyState *fmstate, fmstate->attinmeta, fmstate->retrieved_attrs, NULL, - fmstate->temp_cxt); + fmstate->temp_cxt, + fmstate->binary_format); /* * The returning slot will not necessarily be suitable to store @@ -4543,6 +4665,8 @@ execute_dml_stmt(ForeignScanState *node) ExprContext *econtext = node->ss.ps.ps_ExprContext; int numParams = dmstate->numParams; const char **values = dmstate->param_values; + int *formats = NULL; + bool binary = dmstate->binary_format; /* First, process a pending asynchronous request, if any. */ if (dmstate->conn_state->pendingAreq) @@ -4552,11 +4676,24 @@ execute_dml_stmt(ForeignScanState *node) * Construct array of query parameter values in text format. */ if (numParams > 0) + { process_query_params(econtext, dmstate->param_flinfo, dmstate->param_exprs, - values); + values, + dmstate->param_lengths, + dmstate->binary_format); + + if (binary) + { + int i; + formats = palloc(sizeof(int) * numParams); + for (i = 0; i < numParams; i++) + formats[i] = 1; + } + + } /* * Notice that we pass NULL for paramTypes, thus forcing the remote server * to infer types for all parameters. Since we explicitly cast every @@ -4565,7 +4702,7 @@ execute_dml_stmt(ForeignScanState *node) * server has the same OIDs we do for the parameters' types. */ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, - NULL, values, NULL, NULL, 0)) + NULL, values, dmstate->param_lengths, formats, binary ? 1 : 0)) pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); /* @@ -4635,7 +4772,8 @@ get_returning_data(ForeignScanState *node) dmstate->attinmeta, dmstate->retrieved_attrs, node, - dmstate->temp_cxt); + dmstate->temp_cxt, + dmstate->binary_format); ExecStoreHeapTuple(newtup, slot, false); } PG_CATCH(); @@ -4841,7 +4979,9 @@ prepare_query_params(PlanState *node, int numParams, FmgrInfo **param_flinfo, List **param_exprs, - const char ***param_values) + const char ***param_values, + int **param_lengths, + bool binary_format) { int i; ListCell *lc; @@ -4857,8 +4997,14 @@ prepare_query_params(PlanState *node, Node *param_expr = (Node *) lfirst(lc); Oid typefnoid; bool isvarlena; - - getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + if (binary_format) + { + getTypeBinaryOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + } + else + { + getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + } fmgr_info(typefnoid, &(*param_flinfo)[i]); i++; } @@ -4875,6 +5021,8 @@ prepare_query_params(PlanState *node, /* Allocate buffer for text form of query parameters. */ *param_values = (const char **) palloc0(numParams * sizeof(char *)); + if (binary_format) + *param_lengths = (int *) palloc0(numParams * sizeof(int)); } /* @@ -4884,7 +5032,9 @@ static void process_query_params(ExprContext *econtext, FmgrInfo *param_flinfo, List *param_exprs, - const char **param_values) + const char **param_values, + int *param_lengths, + bool binary_format) { int nestlevel; int i; @@ -4907,7 +5057,19 @@ process_query_params(ExprContext *econtext, * type-specific output function, unless the value is null. */ if (isNull) + { param_values[i] = NULL; + + /* Binary params are parsed as NULL if length is -1 */ + if (binary_format) + param_lengths[i] = -1; + } + else if (binary_format) + { + bytea *val = SendFunctionCall(¶m_flinfo[i], expr_value); + param_values[i] = VARDATA(val); + param_lengths[i] = VARSIZE(val) - VARHDRSZ; + } else param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); @@ -5011,7 +5173,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, /* Initialize workspace state */ astate.rel = relation; - astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); astate.rows = rows; astate.targrows = targrows; @@ -5035,12 +5196,17 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, user = GetUserMapping(relation->rd_rel->relowner, table->serverid); conn = GetConnection(user, false, NULL); + astate.binary_format = get_binary_format_option(server, table); + if (astate.binary_format) + astate.attinmeta = TupleDescGetAttInBinaryMetadata(RelationGetDescr(relation)); + else + astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));; /* * Construct cursor that retrieves whole rows from remote. */ cursor_number = GetCursorNumber(conn); initStringInfo(&sql); - appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); + appendStringInfo(&sql, "DECLARE c%u %sCURSOR FOR ", cursor_number, astate.binary_format ? "BINARY " : ""); deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); /* In what follows, do not risk leaking any PGresults. */ @@ -5212,7 +5378,8 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) astate->attinmeta, astate->retrieved_attrs, NULL, - astate->temp_cxt); + astate->temp_cxt, + astate->binary_format); MemoryContextSwitchTo(oldcontext); } @@ -5922,6 +6089,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); else if (strcmp(def->defname, "async_capable") == 0) fpinfo->async_capable = defGetBoolean(def); + else if (strcmp(def->defname, "binary_format") == 0) + fpinfo->binary_format = defGetBoolean(def); } } @@ -5945,6 +6114,9 @@ apply_table_options(PgFdwRelationInfo *fpinfo) (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); else if (strcmp(def->defname, "async_capable") == 0) fpinfo->async_capable = defGetBoolean(def); + else if (strcmp(def->defname, "binary_format") == 0) + fpinfo->binary_format = defGetBoolean(def); + } } @@ -5980,6 +6152,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; fpinfo->async_capable = fpinfo_o->async_capable; + fpinfo->binary_format = fpinfo_o->binary_format; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -6011,6 +6184,9 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, */ fpinfo->async_capable = fpinfo_o->async_capable || fpinfo_i->async_capable; + + /* Join will use binary proto only if both base rels are configured to use it. */ + fpinfo->binary_format = fpinfo_o->binary_format && fpinfo->binary_format; } } @@ -7209,7 +7385,8 @@ make_tuple_from_result_row(PGresult *res, AttInMetadata *attinmeta, List *retrieved_attrs, ForeignScanState *fsstate, - MemoryContext temp_context) + MemoryContext temp_context, + bool binary_format) { HeapTuple tuple; TupleDesc tupdesc; @@ -7221,6 +7398,10 @@ make_tuple_from_result_row(PGresult *res, MemoryContext oldcontext; ListCell *lc; int j; + StringInfo buf = NULL; + + if (binary_format) + buf = makeStringInfo(); Assert(row < PQntuples(res)); @@ -7274,6 +7455,11 @@ make_tuple_from_result_row(PGresult *res, else valstr = PQgetvalue(res, row, j); + if (binary_format && valstr != NULL) + { + resetStringInfo(buf); + appendBinaryStringInfo(buf, valstr, PQgetlength(res, row, j)); + } /* * convert value to internal representation * @@ -7286,10 +7472,20 @@ make_tuple_from_result_row(PGresult *res, Assert(i <= tupdesc->natts); nulls[i - 1] = (valstr == NULL); /* Apply the input function even to nulls, to support domains */ - values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], - valstr, - attinmeta->attioparams[i - 1], - attinmeta->atttypmods[i - 1]); + if (binary_format) + { + values[i - 1] = ReceiveFunctionCall(&attinmeta->attinfuncs[i - 1], + nulls[i - 1] ? NULL : buf, + attinmeta->attioparams[i - 1], + attinmeta->atttypmods[i - 1]); + } + else + { + values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], + valstr, + attinmeta->attioparams[i - 1], + attinmeta->atttypmods[i - 1]); + } } else if (i == SelfItemPointerAttributeNumber) { @@ -7298,7 +7494,10 @@ make_tuple_from_result_row(PGresult *res, { Datum datum; - datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); + if (binary_format) + datum = DirectFunctionCall1(tidrecv, PointerGetDatum(buf)); + else + datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); ctid = (ItemPointer) DatumGetPointer(datum); } } @@ -7556,16 +7755,46 @@ find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, return NULL; } + +static bool +get_binary_format_option(ForeignServer *server, ForeignTable *table) +{ + List *options; + ListCell *lc; + + /* By default, text protocol is used */ + bool binary_format = false; + + /* + * Load options for table and server. We append server options after table + * options, because table options take precedence. + */ + options = NIL; + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies batch_size. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "binary_format") == 0) + { + (void) parse_bool(defGetString(def), &binary_format); + break; + } + } + list_free(options); + return binary_format; +} + /* * Determine batch size for a given foreign table. The option specified for * a table has precedence. */ static int -get_batch_size_option(Relation rel) +get_batch_size_option(ForeignServer *server, ForeignTable *table) { - Oid foreigntableid = RelationGetRelid(rel); - ForeignTable *table; - ForeignServer *server; List *options; ListCell *lc; @@ -7576,9 +7805,6 @@ get_batch_size_option(Relation rel) * Load options for table and server. We append server options after table * options, because table options take precedence. */ - table = GetForeignTable(foreigntableid); - server = GetForeignServer(table->serverid); - options = NIL; options = list_concat(options, table->options); options = list_concat(options, server->options); @@ -7595,5 +7821,54 @@ get_batch_size_option(Relation rel) } } + list_free(options); return batch_size; } + +/* + * pTupleDescGetAttInBinaryMetadata - Basically a copy of TupleDescGetAttInMetadata + * where input function is replaced binary input function. + */ +static AttInMetadata * +TupleDescGetAttInBinaryMetadata(TupleDesc tupdesc) +{ + int natts = tupdesc->natts; + int i; + Oid atttypeid; + Oid attinfuncid; + FmgrInfo *attinfuncinfo; + Oid *attioparams; + int32 *atttypmods; + AttInMetadata *attinmeta; + + attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata)); + + /* "Bless" the tupledesc so that we can make rowtype datums with it */ + attinmeta->tupdesc = BlessTupleDesc(tupdesc); + + /* + * Gather info needed later to call the "in" function for each attribute + */ + attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo)); + attioparams = (Oid *) palloc0(natts * sizeof(Oid)); + atttypmods = (int32 *) palloc0(natts * sizeof(int32)); + + for (i = 0; i < natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + /* Ignore dropped attributes */ + if (!att->attisdropped) + { + atttypeid = att->atttypid; + getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]); + fmgr_info(attinfuncid, &attinfuncinfo[i]); + atttypmods[i] = att->atttypmod; + } + } + attinmeta->attinfuncs = attinfuncinfo; + attinmeta->attioparams = attioparams; + attinmeta->atttypmods = atttypmods; + + return attinmeta; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index a11d45bedf..993d3f026a 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -80,6 +80,7 @@ typedef struct PgFdwRelationInfo Cost fdw_tuple_cost; List *shippable_extensions; /* OIDs of shippable extensions */ bool async_capable; + bool binary_format; /* Cached catalog information. */ ForeignTable *table; -- 2.30.2