Attached is a WIP patch for the following:
/*
* postgresPlanForeignModify
* Plan an insert/update/delete operation on a foreign table
*
* Note: currently, the plan tree generated for UPDATE/DELETE will always
* include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
* and then the ModifyTable node will have to execute individual remote
* UPDATE/DELETE commands. If there are no local conditions or joins
* needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
* and then do nothing at ModifyTable. Room for future optimization ...
*/
In the patch postgresPlanForeignModify has been modified so that if, in
addition to the above condition, the followings are satisfied, then the
ForeignScan and ModifyTable node will work that way.
- There are no local BEFORE/AFTER triggers.
- In UPDATE it's safe to evaluate expressions to assign to the target
columns on the remote server.
Here is a simple performance test.
On remote side:
postgres=# create table t (id serial primary key, inserted timestamp
default clock_timestamp(), data text);
CREATE TABLE
postgres=# insert into t(data) select random() from generate_series(0,
99999);
INSERT 0 100000
postgres=# vacuum t;
VACUUM
On local side:
postgres=# create foreign table ft (id integer, inserted timestamp, data
text) server myserver options (table_name 't');
CREATE FOREIGN TABLE
Unpatched:
postgres=# explain analyze verbose delete from ft where id < 10000;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Delete on public.ft (cost=100.00..162.32 rows=910 width=6) (actual
time=1275.255..1275.255 rows=0 loops=1)
Remote SQL: DELETE FROM public.t WHERE ctid = $1
-> Foreign Scan on public.ft (cost=100.00..162.32 rows=910 width=6)
(actual time=1.180..52.095 rows=9999 loops=1)
Output: ctid
Remote SQL: SELECT ctid FROM public.t WHERE ((id < 10000)) FOR
UPDATE
Planning time: 0.112 ms
Execution time: 1275.733 ms
(7 rows)
Patched (Note that the DELETE command has been pushed down.):
postgres=# explain analyze verbose delete from ft where id < 10000;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------
Delete on public.ft (cost=100.00..162.32 rows=910 width=6) (actual
time=0.006..0.006 rows=0 loops=1)
-> Foreign Scan on public.ft (cost=100.00..162.32 rows=910 width=6)
(actual time=0.001..0.001 rows=0 loops=1)
Output: ctid
Remote SQL: DELETE FROM public.t WHERE ((id < 10000))
Planning time: 0.101 ms
Execution time: 8.808 ms
(6 rows)
I'll add this to the next CF. Comments are welcome.
Thanks,
Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/deparse.c
--- b/contrib/postgres_fdw/deparse.c
***************
*** 189,198 **** is_foreign_expr(PlannerInfo *root,
if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt))
return false;
- /* Expressions examined here should be boolean, ie noncollatable */
- Assert(loc_cxt.collation == InvalidOid);
- Assert(loc_cxt.state == FDW_COLLATE_NONE);
-
/*
* An expression which includes any mutable functions can't be sent over
* because its result is not stable. For example, sending now() remote
--- 189,194 ----
***************
*** 928,933 **** deparseUpdateSql(StringInfo buf, PlannerInfo *root,
--- 924,982 ----
}
/*
+ * deparse remote UPDATE statement
+ *
+ * The statement text is appended to buf, and we also create an integer List
+ * of the columns being retrieved by RETURNING (if any), which is returned
+ * to *retrieved_attrs.
+ */
+ void
+ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
+ Index rtindex, Relation rel,
+ List *remote_conds,
+ List *targetlist,
+ List *targetAttrs, List
*returningList,
+ List **retrieved_attrs)
+ {
+ RelOptInfo *baserel = root->simple_rel_array[rtindex];
+ List *params_list = NIL;
+ deparse_expr_cxt context;
+ bool first;
+ ListCell *lc;
+
+ /* Set up context struct for recursion */
+ context.root = root;
+ context.foreignrel = baserel;
+ context.buf = buf;
+ context.params_list = NULL;
+
+ appendStringInfoString(buf, "UPDATE ");
+ deparseRelation(buf, rel);
+ appendStringInfoString(buf, " SET ");
+
+ first = true;
+ foreach(lc, targetAttrs)
+ {
+ int attnum = lfirst_int(lc);
+ TargetEntry *tle = get_tle_by_resno(targetlist, attnum);
+
+ if (!first)
+ appendStringInfoString(buf, ", ");
+ first = false;
+
+ deparseColumnRef(buf, rtindex, attnum, root);
+ appendStringInfo(buf, " = ");
+ deparseExpr((Expr *) tle->expr, &context);
+ }
+ if (remote_conds)
+ appendWhereClause(buf, root, baserel, remote_conds,
+ true, ¶ms_list);
+
+ deparseReturningList(buf, root, rtindex, rel, false,
+ returningList,
retrieved_attrs);
+ }
+
+ /*
* deparse remote DELETE statement
*
* The statement text is appended to buf, and we also create an integer List
***************
*** 950,955 **** deparseDeleteSql(StringInfo buf, PlannerInfo *root,
--- 999,1031 ----
}
/*
+ * deparse remote DELETE statement
+ *
+ * The statement text is appended to buf, and we also create an integer List
+ * of the columns being retrieved by RETURNING (if any), which is returned
+ * to *retrieved_attrs.
+ */
+ void
+ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
+ Index rtindex, Relation rel,
+ List *remote_conds,
+ List *returningList,
+ List **retrieved_attrs)
+ {
+ RelOptInfo *baserel = root->simple_rel_array[rtindex];
+ List *params_list = NIL;
+
+ appendStringInfoString(buf, "DELETE FROM ");
+ deparseRelation(buf, rel);
+ if (remote_conds)
+ appendWhereClause(buf, root, baserel, remote_conds,
+ true, ¶ms_list);
+
+ deparseReturningList(buf, root, rtindex, rel, false,
+ returningList,
retrieved_attrs);
+ }
+
+ /*
* Add a RETURNING clause, if needed, to an INSERT/UPDATE/DELETE.
*/
static void
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
***************
*** 1124,1138 **** UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9',
c7 = DEFAULT
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
EXPLAIN (verbose, costs off)
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
! QUERY PLAN
!
----------------------------------------------------------------------------------------
Delete on public.ft2
Output: c1, c4
- Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c4
-> Foreign Scan on public.ft2
Output: ctid
! Remote SQL: SELECT ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5))
FOR UPDATE
! (6 rows)
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
c1 | c4
--- 1124,1137 ----
FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
EXPLAIN (verbose, costs off)
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
! QUERY PLAN
!
--------------------------------------------------------------------------------------------
Delete on public.ft2
Output: c1, c4
-> Foreign Scan on public.ft2
Output: ctid
! Remote SQL: DELETE FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5))
RETURNING "C 1", c4
! (5 rows)
DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
c1 | c4
***************
*** 2227,2233 **** CONTEXT: Remote SQL command: INSERT INTO "S 1"."T 1"("C 1",
c2, c3, c4, c5, c6,
UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive
ERROR: new row for relation "T 1" violates check constraint "c2positive"
DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02
08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo).
! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1
-- Test savepoint/rollback behavior
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
c2 | count
--- 2226,2232 ----
UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive
ERROR: new row for relation "T 1" violates check constraint "c2positive"
DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02
08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo).
! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (- c2) WHERE (("C
1" = 1))
-- Test savepoint/rollback behavior
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
c2 | count
***************
*** 2386,2392 **** savepoint s3;
update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side
ERROR: new row for relation "T 1" violates check constraint "c2positive"
DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update,
1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo).
! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1
rollback to savepoint s3;
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
c2 | count
--- 2385,2391 ----
update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side
ERROR: new row for relation "T 1" violates check constraint "c2positive"
DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update,
1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo).
! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (-2) WHERE ((c2 =
42)) AND (("C 1" = 10))
rollback to savepoint s3;
select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
c2 | count
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 87,93 **** typedef struct PgFdwRelationInfo
* planner to executor. Currently we store:
*
* 1) SELECT statement text to be sent to the remote server
! * 2) Integer list of attribute numbers retrieved by the SELECT
*
* These items are indexed with the enum FdwScanPrivateIndex, so an item
* can be fetched with list_nth(). For example, to get the SELECT statement:
--- 87,98 ----
* planner to executor. Currently we store:
*
* 1) SELECT statement text to be sent to the remote server
! * 2) List of restriction clauses that can be executed remotely
! * 3) Integer list of attribute numbers retrieved by the SELECT
! * 4) UPDATE/DELETE statement text to be sent to the remote server
! * 5) Boolean flag showing if we set the command es_processed
! * 6) Boolean flag showing if the remote query has a RETURNING clause
! * 7) Integer list of attribute numbers retrieved by RETURNING, if any
*
* These items are indexed with the enum FdwScanPrivateIndex, so an item
* can be fetched with list_nth(). For example, to get the SELECT statement:
***************
*** 97,106 **** enum FdwScanPrivateIndex
{
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql,
! /* Integer list of attribute numbers retrieved by the SELECT */
! FdwScanPrivateRetrievedAttrs
};
/*
* Similarly, this enum describes what's kept in the fdw_private list for
* a ModifyTable node referencing a postgres_fdw foreign table. We store:
--- 102,124 ----
{
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql,
! /* List of restriction clauses that can be executed remotely */
! FdwScanPrivateRemoteConds,
! /* Integer list of attribute numbers retrieved by SELECT */
! FdwScanPrivateRetrievedAttrsBySelect,
! /* UPDATE/DELETE statement to execute remotely (as a String node) */
! FdwScanPrivateUpdateSql,
! /* set-processed flag (as an integer Value node) */
! FdwScanPrivateSetProcessed,
! /* has-returning flag (as an integer Value node) */
! FdwScanPrivateHasReturning,
! /* Integer list of attribute numbers retrieved by RETURNING */
! FdwScanPrivateRetrievedAttrsByReturning
};
+ #define MinFdwScanFdwPrivateLength 3
+ #define MaxFdwScanFdwPrivateLength 7
+
/*
* Similarly, this enum describes what's kept in the fdw_private list for
* a ModifyTable node referencing a postgres_fdw foreign table. We store:
***************
*** 132,139 **** typedef struct PgFdwScanState
AttInMetadata *attinmeta; /* attribute datatype conversion
metadata */
/* extracted fdw_private data */
! char *query; /* text of SELECT command */
List *retrieved_attrs; /* list of retrieved attribute numbers
*/
/* for remote query execution */
PGconn *conn; /* connection for the scan */
--- 150,159 ----
AttInMetadata *attinmeta; /* attribute datatype conversion
metadata */
/* extracted fdw_private data */
! char *query; /* text of SELECT or
UPDATE/DELETE command */
List *retrieved_attrs; /* list of retrieved attribute numbers
*/
+ bool set_processed; /* do we set the command es_processed?
*/
+ bool has_returning; /* is there a RETURNING clause? */
/* for remote query execution */
PGconn *conn; /* connection for the scan */
***************
*** 153,158 **** typedef struct PgFdwScanState
--- 173,183 ----
int fetch_ct_2; /* Min(# of fetches
done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for direct update */
+ bool direct_update; /* do we update the foreign table
directly? */
+ PGresult *result; /* result of an UPDATE/DELETE
query */;
+ TupleTableSlot *rslot; /* slot containing the result tuple */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of
tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data
*/
***************
*** 181,186 **** typedef struct PgFdwModifyState
--- 206,215 ----
int p_nums; /* number of parameters
to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them
*/
+ /* for direct update */
+ bool direct_update; /* do we update the foreign table
directly? */
+ PgFdwScanState *fsstate; /* execution state of a foreign scan */
+
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data
*/
} PgFdwModifyState;
***************
*** 307,318 **** static bool ec_member_matches_foreign(PlannerInfo *root,
RelOptInfo *rel,
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
TupleTableSlot *slot);
! static void store_returning_result(PgFdwModifyState *fmstate,
! TupleTableSlot *slot, PGresult *res);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int
targrows,
double *totalrows,
--- 336,364 ----
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
+ static bool check_direct_update(PlannerInfo *root,
+ ModifyTable
*plan,
+ Index
resultRelation,
+ int
subplan_index,
+ Relation rel,
+ List
*targetAttrs);
+ static List *plan_direct_update(PlannerInfo *root,
+ ModifyTable
*plan,
+ Index
resultRelation,
+ int
subplan_index,
+ Relation rel,
+ List
*targetAttrs);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
TupleTableSlot *slot);
! static void store_returning_result(TupleTableSlot *slot,
! PGresult *res,
! int row,
! Relation rel,
! AttInMetadata *attinmeta,
! List *retrieved_attrs,
! MemoryContext temp_context);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int
targrows,
double *totalrows,
***************
*** 848,854 **** postgresGetForeignPlan(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwScanPrivateIndex, above.
*/
! fdw_private = list_make2(makeString(sql.data),
retrieved_attrs);
/*
--- 894,901 ----
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwScanPrivateIndex, above.
*/
! fdw_private = list_make3(makeString(sql.data),
! remote_conds,
retrieved_attrs);
/*
***************
*** 910,931 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
server = GetForeignServer(table->serverid);
user = GetUserMapping(userid, server->serverid);
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
fsstate->conn = GetConnection(server, user, false);
- /* Assign a unique ID for my cursor */
- fsstate->cursor_number = GetCursorNumber(fsstate->conn);
- fsstate->cursor_exists = false;
-
- /* Get private info created by planner functions. */
- fsstate->query = strVal(list_nth(fsplan->fdw_private,
-
FdwScanPrivateSelectSql));
- fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
-
FdwScanPrivateRetrievedAttrs);
-
/* Create contexts for batches of tuples and per-tuple temp workspace.
*/
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw tuple data",
--- 957,996 ----
server = GetForeignServer(table->serverid);
user = GetUserMapping(userid, server->serverid);
+ /* Get private info created by planner functions. */
+ if (list_length(fsplan->fdw_private) >= MaxFdwScanFdwPrivateLength)
+ {
+ Assert(list_length(fsplan->fdw_private) ==
MaxFdwScanFdwPrivateLength);
+
+ fsstate->query = strVal(list_nth(fsplan->fdw_private,
+
FdwScanPrivateUpdateSql));
+ fsstate->set_processed = intVal(list_nth(fsplan->fdw_private,
+
FdwScanPrivateSetProcessed));
+ fsstate->has_returning = intVal(list_nth(fsplan->fdw_private,
+
FdwScanPrivateHasReturning));
+ fsstate->retrieved_attrs = (List *)
list_nth(fsplan->fdw_private,
+
FdwScanPrivateRetrievedAttrsByReturning);
+
+ fsstate->direct_update = true;
+ }
+ else
+ {
+ Assert(list_length(fsplan->fdw_private) ==
MinFdwScanFdwPrivateLength);
+
+ fsstate->query = strVal(list_nth(fsplan->fdw_private,
+
FdwScanPrivateSelectSql));
+ fsstate->retrieved_attrs = (List *)
list_nth(fsplan->fdw_private,
+
FdwScanPrivateRetrievedAttrsBySelect);
+
+ fsstate->direct_update = false;
+ }
+
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
fsstate->conn = GetConnection(server, user, false);
/* Create contexts for batches of tuples and per-tuple temp workspace.
*/
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw tuple data",
***************
*** 941,946 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1006,1037 ----
/* Get info we'll need for input data conversion. */
fsstate->attinmeta =
TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
+ if (fsstate->direct_update)
+ {
+ /*
+ * Execute the update statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw
error
+ * without releasing the PGresult.
+ */
+ fsstate->result = PQexec(fsstate->conn, fsstate->query);
+ if (PQresultStatus(fsstate->result) !=
+ (fsstate->has_returning ? PGRES_TUPLES_OK :
PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, fsstate->result,
fsstate->conn, true,
+ fsstate->query);
+
+ /* Check number of rows affected. */
+ if (fsstate->has_returning)
+ fsstate->num_tuples = PQntuples(fsstate->result);
+ else
+ fsstate->num_tuples =
atoi(PQcmdTuples(fsstate->result));
+ return;
+ }
+
+ /* Assign a unique ID for my cursor */
+ fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_exists = false;
+
/* Prepare for output conversion of parameters used in remote query. */
numParams = list_length(fsplan->fdw_exprs);
fsstate->numParams = numParams;
***************
*** 990,995 **** postgresIterateForeignScan(ForeignScanState *node)
--- 1081,1129 ----
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ if (fsstate->direct_update)
+ {
+ MemoryContext oldcontext;
+
+ if (!fsstate->has_returning)
+ {
+ EState *estate = node->ss.ps.state;
+
+ if (fsstate->set_processed)
+ estate->es_processed += fsstate->num_tuples;
+ return ExecClearTuple(slot);
+ }
+
+ /* If we didn't get any tuples, must be end of data. */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ return ExecClearTuple(slot);
+
+ /* We'll store RETURNING tuples in the batch_cxt. */
+ oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
+
+ /* Fetch the next tuple. */
+ store_returning_result(slot,
+ fsstate->result,
+ fsstate->next_tuple,
+ fsstate->rel,
+ fsstate->attinmeta,
+
fsstate->retrieved_attrs,
+ fsstate->temp_cxt);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /* Save the result. */
+ fsstate->rslot = slot;
+ fsstate->next_tuple++;
+
+ /*
+ * Return slot. Note that this is safe because that there are
no local
+ * conditions and because that the tuple contained in the slot
is
+ * projected safely and then ignored (see also
plan_direct_update).
+ */
+ return slot;
+ }
+
/*
* If this is the first call after Begin or ReScan, we need to create
the
* cursor on the remote side.
***************
*** 1090,1095 **** postgresEndForeignScan(ForeignScanState *node)
--- 1224,1236 ----
if (fsstate == NULL)
return;
+ if (fsstate->direct_update)
+ {
+ /* Clean up */
+ if (fsstate->result)
+ PQclear(fsstate->result);
+ }
+
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
***************
*** 1206,1211 **** postgresPlanForeignModify(PlannerInfo *root,
--- 1347,1378 ----
}
/*
+ * For an UPDATE/DELETE command, if there are no local conditions or
joins
+ * needed (see check_direct_update for more details), let the scan node
do
+ * UPDATE/DELETE RETURNING and then do nothing at ModifyTable.
+ */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ /* Check whether it's safe to do direct update. */
+ if (check_direct_update(root, plan,
+ resultRelation,
+ subplan_index,
+ rel,
targetAttrs))
+ {
+ List *fdw_private;
+
+ /* OK, generate a plan to do direct update. */
+ fdw_private = plan_direct_update(root, plan,
+
resultRelation,
+
subplan_index,
+
rel, targetAttrs);
+
+ heap_close(rel, NoLock);
+ return fdw_private;
+ }
+ }
+
+ /*
* Extract the relevant RETURNING list if any.
*/
if (plan->returningLists)
***************
*** 1284,1289 **** postgresBeginForeignModify(ModifyTableState *mtstate,
--- 1451,1489 ----
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
fmstate->rel = rel;
+ /* Deconstruct fdw_private data. */
+ fmstate->query = strVal(list_nth(fdw_private,
+
FdwModifyPrivateUpdateSql));
+ fmstate->target_attrs = (List *) list_nth(fdw_private,
+
FdwModifyPrivateTargetAttnums);
+ fmstate->has_returning = intVal(list_nth(fdw_private,
+
FdwModifyPrivateHasReturning));
+ fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
+
FdwModifyPrivateRetrievedAttrs);
+
+ if (fmstate->query == NULL)
+ {
+ PlanState *node = mtstate->mt_plans[subplan_index];
+ PgFdwScanState *fsstate;
+
+ Assert(fmstate->target_attrs == NIL);
+ Assert(fmstate->has_returning == false);
+ Assert(fmstate->retrieved_attrs == NIL);
+
+ Assert(nodeTag(node) == T_ForeignScanState);
+ fsstate = ((ForeignScanState *) node)->fdw_state;
+
+ Assert(fsstate->direct_update);
+ fmstate->direct_update = true;
+ if (fsstate->has_returning)
+ {
+ fmstate->has_returning = true;
+ fmstate->fsstate = fsstate;
+ }
+ resultRelInfo->ri_FdwState = fmstate;
+ return;
+ }
+
/*
* Identify which user to do the remote access as. This should match
what
* ExecCheckRTEPerms() does.
***************
*** 1300,1315 **** postgresBeginForeignModify(ModifyTableState *mtstate,
fmstate->conn = GetConnection(server, user, true);
fmstate->p_name = NULL; /* prepared statement not made yet */
- /* Deconstruct fdw_private data. */
- fmstate->query = strVal(list_nth(fdw_private,
-
FdwModifyPrivateUpdateSql));
- fmstate->target_attrs = (List *) list_nth(fdw_private,
-
FdwModifyPrivateTargetAttnums);
- fmstate->has_returning = intVal(list_nth(fdw_private,
-
FdwModifyPrivateHasReturning));
- fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
-
FdwModifyPrivateRetrievedAttrs);
-
/* Create context for per-tuple temp workspace. */
fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw temporary data",
--- 1500,1505 ----
***************
*** 1407,1413 **** postgresExecForeignInsert(EState *estate,
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
--- 1597,1607 ----
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(slot, res, 0,
! fmstate->rel,
!
fmstate->attinmeta,
!
fmstate->retrieved_attrs,
!
fmstate->temp_cxt);
}
else
n_rows = atoi(PQcmdTuples(res));
***************
*** 1438,1443 **** postgresExecForeignUpdate(EState *estate,
--- 1632,1645 ----
PGresult *res;
int n_rows;
+ /* Return slot created in the ForeignScan node when doing direct
update. */
+ if (fmstate->direct_update)
+ {
+ Assert(fmstate->has_returning);
+ Assert(fmstate->fsstate->rslot);
+ return fmstate->fsstate->rslot;
+ }
+
/* Set up the prepared statement on the remote server, if we didn't yet
*/
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
***************
*** 1477,1483 **** postgresExecForeignUpdate(EState *estate,
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
--- 1679,1689 ----
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(slot, res, 0,
! fmstate->rel,
!
fmstate->attinmeta,
!
fmstate->retrieved_attrs,
!
fmstate->temp_cxt);
}
else
n_rows = atoi(PQcmdTuples(res));
***************
*** 1508,1513 **** postgresExecForeignDelete(EState *estate,
--- 1714,1727 ----
PGresult *res;
int n_rows;
+ /* Return slot created in the ForeignScan node when doing direct
update. */
+ if (fmstate->direct_update)
+ {
+ Assert(fmstate->has_returning);
+ Assert(fmstate->fsstate->rslot);
+ return fmstate->fsstate->rslot;
+ }
+
/* Set up the prepared statement on the remote server, if we didn't yet
*/
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
***************
*** 1547,1553 **** postgresExecForeignDelete(EState *estate,
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(fmstate, slot, res);
}
else
n_rows = atoi(PQcmdTuples(res));
--- 1761,1771 ----
{
n_rows = PQntuples(res);
if (n_rows > 0)
! store_returning_result(slot, res, 0,
! fmstate->rel,
!
fmstate->attinmeta,
!
fmstate->retrieved_attrs,
!
fmstate->temp_cxt);
}
else
n_rows = atoi(PQcmdTuples(res));
***************
*** 1575,1580 **** postgresEndForeignModify(EState *estate,
--- 1793,1802 ----
if (fmstate == NULL)
return;
+ /* If doing direct update, there is nothing to do */
+ if (fmstate->direct_update)
+ return;
+
/* If we created a prepared statement, destroy it */
if (fmstate->p_name)
{
***************
*** 1653,1663 **** postgresExplainForeignScan(ForeignScanState *node,
ExplainState *es)
{
List *fdw_private;
char *sql;
if (es->verbose)
{
fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
! sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
ExplainPropertyText("Remote SQL", sql, es);
}
}
--- 1875,1900 ----
{
List *fdw_private;
char *sql;
+ bool direct_update;
if (es->verbose)
{
fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
! if (list_length(fdw_private) >= MaxFdwScanFdwPrivateLength)
! {
! Assert(list_length(fdw_private) ==
MaxFdwScanFdwPrivateLength);
! direct_update = true;
! }
! else
! {
! Assert(list_length(fdw_private) ==
MinFdwScanFdwPrivateLength);
! direct_update = false;
! }
!
! if (direct_update)
! sql = strVal(list_nth(fdw_private,
FdwScanPrivateUpdateSql));
! else
! sql = strVal(list_nth(fdw_private,
FdwScanPrivateSelectSql));
ExplainPropertyText("Remote SQL", sql, es);
}
}
***************
*** 1678,1684 **** postgresExplainForeignModify(ModifyTableState *mtstate,
char *sql = strVal(list_nth(fdw_private,
FdwModifyPrivateUpdateSql));
! ExplainPropertyText("Remote SQL", sql, es);
}
}
--- 1915,1922 ----
char *sql = strVal(list_nth(fdw_private,
FdwModifyPrivateUpdateSql));
! if (sql != NULL)
! ExplainPropertyText("Remote SQL", sql, es);
}
}
***************
*** 1907,1912 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2145,2338 ----
}
/*
+ * Check whether it's safe to update the foreign table directly.
+ */
+ static bool
+ check_direct_update(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index,
+ Relation rel,
+ List *targetAttrs)
+ {
+ RelOptInfo *baserel = root->simple_rel_array[resultRelation];
+ Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
+ ListCell *lc;
+
+ if (rel->trigdesc &&
+ (rel->trigdesc->trig_update_after_row ||
+ rel->trigdesc->trig_update_before_row))
+ return false;
+
+ if (nodeTag(subplan) != T_ForeignScan || subplan->qual != NIL)
+ return false;
+
+ foreach(lc, targetAttrs)
+ {
+ int attnum = lfirst_int(lc);
+ TargetEntry *tle = get_tle_by_resno(subplan->targetlist,
+
attnum);
+
+ if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
+ return false;
+ }
+
+ return true;
+ }
+
+ /*
+ * Generate a plan to update the foreign table directly.
+ */
+ static List *
+ plan_direct_update(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index,
+ Relation rel,
+ List *targetAttrs)
+ {
+ CmdType operation = plan->operation;
+ bool canSetTag = plan->canSetTag;
+ Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
+ ForeignScan *fscan = (ForeignScan *) subplan;
+ StringInfoData sql;
+ List *remote_conds;
+ List *returningList = NIL;
+ List *retrieved_attrs = NIL;
+ List *new_tlist = NIL;
+ List *fdw_private;
+
+ Assert(operation == CMD_UPDATE || operation == CMD_DELETE);
+
+ initStringInfo(&sql);
+
+ /*
+ * Extract the baserestrictinfo clauses that can be evaluated remotely.
+ */
+ remote_conds = (List *) list_nth(fscan->fdw_private,
+
FdwScanPrivateRemoteConds);
+
+ /*
+ * Extract the relevant RETURNING list if any.
+ */
+ if (plan->returningLists)
+ returningList = (List *) list_nth(plan->returningLists,
subplan_index);
+
+ /*
+ * Construct the SQL command string.
+ */
+ if (operation == CMD_UPDATE)
+ {
+ List *targetlist = subplan->targetlist;
+
+ deparseDirectUpdateSql(&sql, root, resultRelation, rel,
+ remote_conds,
+ targetlist,
+ targetAttrs,
+ returningList,
+ &retrieved_attrs);
+ }
+ else
+ {
+ Assert(operation == CMD_DELETE);
+
+ deparseDirectDeleteSql(&sql, root, resultRelation, rel,
+ remote_conds,
+ returningList,
+ &retrieved_attrs);
+ }
+
+ /*
+ * Update the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwScanPrivateIndex, above.
+ */
+ fscan->fdw_private = lappend(fscan->fdw_private, makeString(sql.data));
+ fscan->fdw_private = lappend(fscan->fdw_private,
makeInteger(canSetTag));
+ fscan->fdw_private = lappend(fscan->fdw_private,
+
makeInteger((retrieved_attrs != NIL)));
+ fscan->fdw_private = lappend(fscan->fdw_private, retrieved_attrs);
+
+ /*
+ * Rrewrite the targetlist for an UPDATE command for safety of
ExecProject.
+ * Note we ignore and do not reference result tuples in direct update
case.
+ */
+ if (operation == CMD_UPDATE)
+ {
+ ListCell *lc;
+ int attrno = 1;
+ int numattrs =
RelationGetNumberOfAttributes(rel);
+
+ foreach(lc, subplan->targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+ if (tle->resjunk)
+ {
+ new_tlist = lappend(new_tlist, tle);
+ continue;
+ }
+
+ if (attrno > numattrs)
+ ereport(ERROR,
+
(errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table row type and
query-specified row type do not match"),
+ errdetail("Query has too many
columns.")));
+
+ if (!list_member_int(targetAttrs, attrno))
+ new_tlist = lappend(new_tlist, tle);
+ else
+ {
+ Form_pg_attribute attr;
+ Oid atttype;
+ int32 atttypmod;
+ Oid attcollation;
+ Node *new_expr;
+ TargetEntry *new_tle;
+
+ attr = rel->rd_att->attrs[attrno - 1];
+
+ Assert(!attr->attisdropped);
+ atttype = attr->atttypid;
+ atttypmod = attr->atttypmod;
+ attcollation = attr->attcollation;
+
+ new_expr = (Node *) makeVar(resultRelation,
+
attrno,
+
atttype,
+
atttypmod,
+
attcollation,
+
0);
+
+ new_tle = makeTargetEntry((Expr *) new_expr,
+
attrno,
+
pstrdup(NameStr(attr->attname)),
+
false);
+
+ new_tlist = lappend(new_tlist, new_tle);
+ }
+
+ attrno++;
+ }
+
+ if (attrno != numattrs + 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("table row type and
query-specified row type do not match"),
+ errdetail("Query has too few
columns.")));
+
+ subplan->targetlist = new_tlist;
+ }
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwModifyPrivateIndex, above.
+ */
+ fdw_private = list_make4(makeString(NULL), NIL, makeInteger(false),
NIL);
+
+ return fdw_private;
+ }
+
+ /*
* Create cursor for node's query with current parameter values.
*/
static void
***************
*** 2254,2272 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
* have PG_TRY blocks to ensure this happens.
*/
static void
! store_returning_result(PgFdwModifyState *fmstate,
! TupleTableSlot *slot, PGresult *res)
{
/* PGresult must be released before leaving this function. */
PG_TRY();
{
HeapTuple newtup;
! newtup = make_tuple_from_result_row(res, 0,
!
fmstate->rel,
!
fmstate->attinmeta,
!
fmstate->retrieved_attrs,
!
fmstate->temp_cxt);
/* tuple will be deleted when it is cleared from the slot */
ExecStoreTuple(newtup, slot, InvalidBuffer, true);
}
--- 2680,2703 ----
* have PG_TRY blocks to ensure this happens.
*/
static void
! store_returning_result(TupleTableSlot *slot,
! PGresult *res,
! int row,
! Relation rel,
! AttInMetadata *attinmeta,
! List *retrieved_attrs,
! MemoryContext temp_context)
{
/* PGresult must be released before leaving this function. */
PG_TRY();
{
HeapTuple newtup;
! newtup = make_tuple_from_result_row(res, row,
!
rel,
!
attinmeta,
!
retrieved_attrs,
!
temp_context);
/* tuple will be deleted when it is cleared from the slot */
ExecStoreTuple(newtup, slot, InvalidBuffer, true);
}
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 66,75 **** extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
--- 66,87 ----
Index rtindex, Relation rel,
List *targetAttrs, List *returningList,
List **retrieved_attrs);
+ extern void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
+ Index
rtindex, Relation rel,
+ List
*remote_conds,
+ List
*targetlist,
+ List
*targetAttrs,
+ List
*returningList,
+ List
**retrieved_attrs);
extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
Index rtindex, Relation rel,
List *returningList,
List **retrieved_attrs);
+ extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
+ Index
rtindex, Relation rel,
+ List
*remote_conds,
+ List
*returningList,
+ List
**retrieved_attrs);
extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
List **retrieved_attrs);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers