Hi, While working on a fix related to non-direct DML [1], I noticed that postgresExecForeignInsert(), postgresExecForeignUpdate() and postgresExecForeignDelete() functions are almost identical except that postgresExecForeignInsert() doesn't require ctid. The fix that I was working is applicable to Delete and Update but can be useful for Insert as well. I had to add the same code to two places at least and might have missed fixing one of them. Why don't we have a single function which prepares the statement, extract parameters, executes the prepared statement and checks for the results, returned rows etc? It's been a while that these functions are there and haven't produced code which is a lot different for each of these cases. Here's a patch to extract that code into a separate function and use it in all the three hook implementations.
[1] https://www.postgresql.org/message-id/CAFjFpRfcgwsHRmpvoOK-GUQi-n8MgAS+OxcQo=abdn1coyw...@mail.gmail.com -- Best Wishes, Ashutosh Bapat EnterpriseDB Corporation The Postgres Database Company
From 2b7156230423a9f58f2f8ee27ab24aed1156605d Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.ba...@enterprisedb.com> Date: Wed, 18 Apr 2018 15:52:32 +0530 Subject: [PATCH 2/2] Refactor postgres_fdw's DML execution hook functions postgresExecForeignInsert(), postgresExecForeignUpdate() and postgresExecForeignDelete() functions are almost identical except that postgresExecForeignInsert() doesn't require ctid. Extract that code into a separate function and use it in all the three hook implementations. Ashutosh Bapat. --- contrib/postgres_fdw/postgres_fdw.c | 189 ++++++++++------------------------- 1 file changed, 54 insertions(+), 135 deletions(-) diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 30e5726..acf72e6 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -25,6 +25,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/parsenodes.h" #include "optimizer/cost.h" #include "optimizer/clauses.h" #include "optimizer/pathnode.h" @@ -305,6 +306,10 @@ static void postgresBeginForeignModify(ModifyTableState *mtstate, List *fdw_private, int subplan_index, int eflags); +static TupleTableSlot *perform_one_foreign_dml(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, TupleTableSlot *planslot, + CmdType commandtype); static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -1732,26 +1737,50 @@ postgresBeginForeignModify(ModifyTableState *mtstate, } /* - * postgresExecForeignInsert - * Insert one row into a foreign table + * perform_one_foreign_dml + * Execute a prepared DML statement once on the foreign server using + * parameters from the given plan slot. The execution is expected to + * affect only one row on the foreign server. */ static TupleTableSlot * -postgresExecForeignInsert(EState *estate, - ResultRelInfo *resultRelInfo, - TupleTableSlot *slot, - TupleTableSlot *planSlot) +perform_one_foreign_dml(EState *estate, ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, TupleTableSlot *planSlot, + CmdType commandtype) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + ItemPointer ctid; const char **p_values; PGresult *res; int n_rows; + Assert(commandtype == CMD_INSERT || + commandtype == CMD_UPDATE || + commandtype == CMD_DELETE); + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); + if (commandtype == CMD_UPDATE || commandtype == CMD_DELETE) + { + Datum datum; + bool isNull; + + /* Get the ctid that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + + ctid = (ItemPointer) DatumGetPointer(datum); + } + else + ctid = NULL; + /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, NULL, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slot); /* * Execute the prepared statement. @@ -1796,6 +1825,20 @@ postgresExecForeignInsert(EState *estate, } /* + * postgresExecForeignInsert + * Insert one row into a foreign table + */ +static TupleTableSlot * +postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + return perform_one_foreign_dml(estate, resultRelInfo, slot, planSlot, + CMD_INSERT); +} + +/* * postgresExecForeignUpdate * Update one row in a foreign table */ @@ -1805,70 +1848,8 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; - bool isNull; - const char **p_values; - PGresult *res; - int n_rows; - - /* Set up the prepared statement on the remote server, if we didn't yet */ - if (!fmstate->p_name) - prepare_foreign_modify(fmstate); - - /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); - /* shouldn't ever get a null result... */ - if (isNull) - elog(ERROR, "ctid is NULL"); - - /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), - slot); - - /* - * Execute the prepared statement. - */ - if (!PQsendQueryPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); - - /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); - if (PQresultStatus(res) != - (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - - /* Check number of rows affected, and fetch RETURNING tuple if any */ - if (fmstate->has_returning) - { - n_rows = PQntuples(res); - if (n_rows > 0) - store_returning_result(fmstate, slot, res); - } - else - n_rows = atoi(PQcmdTuples(res)); - - /* And clean up */ - PQclear(res); - - MemoryContextReset(fmstate->temp_cxt); - - /* Return NULL if nothing was updated on the remote end */ - return (n_rows > 0) ? slot : NULL; + return perform_one_foreign_dml(estate, resultRelInfo, slot, planSlot, + CMD_UPDATE); } /* @@ -1881,70 +1862,8 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - Datum datum; - bool isNull; - const char **p_values; - PGresult *res; - int n_rows; - - /* Set up the prepared statement on the remote server, if we didn't yet */ - if (!fmstate->p_name) - prepare_foreign_modify(fmstate); - - /* Get the ctid that was passed up as a resjunk column */ - datum = ExecGetJunkAttribute(planSlot, - fmstate->ctidAttno, - &isNull); - /* shouldn't ever get a null result... */ - if (isNull) - elog(ERROR, "ctid is NULL"); - - /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), - NULL); - - /* - * Execute the prepared statement. - */ - if (!PQsendQueryPrepared(fmstate->conn, - fmstate->p_name, - fmstate->p_nums, - p_values, - NULL, - NULL, - 0)) - pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); - - /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); - if (PQresultStatus(res) != - (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) - pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); - - /* Check number of rows affected, and fetch RETURNING tuple if any */ - if (fmstate->has_returning) - { - n_rows = PQntuples(res); - if (n_rows > 0) - store_returning_result(fmstate, slot, res); - } - else - n_rows = atoi(PQcmdTuples(res)); - - /* And clean up */ - PQclear(res); - - MemoryContextReset(fmstate->temp_cxt); - - /* Return NULL if nothing was deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return perform_one_foreign_dml(estate, resultRelInfo, slot, planSlot, + CMD_DELETE); } /* -- 1.7.9.5