Hello.
At Fri, 1 Jun 2018 10:21:39 -0400, Ashutosh Bapat
<[email protected]> wrote in
<CAFjFpRdraYcQnD4tKzNuP1uP6L-gnizi4HLU_UA=28q2m4z...@mail.gmail.com>
> I am not suggesting to commit 0003 in my patch set, but just 0001 and
> 0002 which just raise an error when multiple rows get updated when
> only one row is expected to be updated.
I reconsidered Tom's suggestion and found a way to fix this
problem avoiding FDW-API change.
To make use of PARAM_EXECs here, the attached PoC patch does the
following things. No changes in the core side.
- postgresAddForeignUpdateTargets is no longer useful, thus it is
removed from fdw_function in the attached patch.
- GetForeignRelSize registers table oid and ctid columns into
attrs_used and a new member param_attrs on updates.
- postgresGetForeignPlan assigns two PARAM_EXECs for the two
values, then remember the paramids in fdw_private.
- postgresPlanForeignModify searches for the parameters and
remember their paramids.
After that, doing the following things fixes the issue.
- make_tuple_tuple_from_result_row receives remote table oid and
stores it to the returned tuples.
- postgresIterateForeignScan stores the values into remembered
parameters.
- postgresExecForeignUpdate/Delete read the parameters and
specify remote victims using them accurately.
It fails on some join-pushdown cases since it doesn't add tid
columns to join tlist. I suppose that build_tlist_to_deparse
needs something but I'll consider further tomorrow.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index d272719ff4..503e705c4c 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1107,11 +1107,17 @@ deparseTargetList(StringInfo buf,
bool qualify_col,
List **retrieved_attrs)
{
+ static int check_attrs[4];
+ static char *check_attr_names[] = {"ctid", "oid", "tableoid"};
TupleDesc tupdesc = RelationGetDescr(rel);
bool have_wholerow;
bool first;
int i;
+ check_attrs[0] = SelfItemPointerAttributeNumber;
+ check_attrs[1] = ObjectIdAttributeNumber;
+ check_attrs[2] = TableOidAttributeNumber;
+ check_attrs[3] = FirstLowInvalidHeapAttributeNumber;
*retrieved_attrs = NIL;
/* If there's a whole-row reference, we'll need all the columns. */
@@ -1143,41 +1149,27 @@ deparseTargetList(StringInfo buf,
}
}
- /*
- * Add ctid and oid if needed. We currently don't support retrieving any
- * other system columns.
- */
- if (bms_is_member(SelfItemPointerAttributeNumber - FirstLowInvalidHeapAttributeNumber,
- attrs_used))
+ for (i = 0 ; check_attrs[i] != FirstLowInvalidHeapAttributeNumber ; i++)
{
- if (!first)
- appendStringInfoString(buf, ", ");
- else if (is_returning)
- appendStringInfoString(buf, " RETURNING ");
- first = false;
+ int attr = check_attrs[i];
+ char *attr_name = check_attr_names[i];
- if (qualify_col)
- ADD_REL_QUALIFIER(buf, rtindex);
- appendStringInfoString(buf, "ctid");
+ /* Add system columns if needed. */
+ if (bms_is_member(attr - FirstLowInvalidHeapAttributeNumber,
+ attrs_used))
+ {
+ if (!first)
+ appendStringInfoString(buf, ", ");
+ else if (is_returning)
+ appendStringInfoString(buf, " RETURNING ");
+ first = false;
- *retrieved_attrs = lappend_int(*retrieved_attrs,
- SelfItemPointerAttributeNumber);
- }
- if (bms_is_member(ObjectIdAttributeNumber - FirstLowInvalidHeapAttributeNumber,
- attrs_used))
- {
- if (!first)
- appendStringInfoString(buf, ", ");
- else if (is_returning)
- appendStringInfoString(buf, " RETURNING ");
- first = false;
+ if (qualify_col)
+ ADD_REL_QUALIFIER(buf, rtindex);
+ appendStringInfoString(buf, attr_name);
- if (qualify_col)
- ADD_REL_QUALIFIER(buf, rtindex);
- appendStringInfoString(buf, "oid");
-
- *retrieved_attrs = lappend_int(*retrieved_attrs,
- ObjectIdAttributeNumber);
+ *retrieved_attrs = lappend_int(*retrieved_attrs, attr);
+ }
}
/* Don't generate bad syntax if no undropped columns */
@@ -1725,7 +1717,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
deparseRelation(buf, rel);
appendStringInfoString(buf, " SET ");
- pindex = 2; /* ctid is always the first param */
+ pindex = 3; /* tableoid and ctid are always the first param */
first = true;
foreach(lc, targetAttrs)
{
@@ -1739,7 +1731,7 @@ deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
appendStringInfo(buf, " = $%d", pindex);
pindex++;
}
- appendStringInfoString(buf, " WHERE ctid = $1");
+ appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
deparseReturningList(buf, rte, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_update_after_row,
@@ -1855,7 +1847,7 @@ deparseDeleteSql(StringInfo buf, RangeTblEntry *rte,
{
appendStringInfoString(buf, "DELETE FROM ");
deparseRelation(buf, rel);
- appendStringInfoString(buf, " WHERE ctid = $1");
+ appendStringInfoString(buf, " WHERE tableoid = $1 AND ctid = $2");
deparseReturningList(buf, rte, rtindex, rel,
rel->trigdesc && rel->trigdesc->trig_delete_after_row,
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 78b0f43ca8..7557d9add7 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -73,7 +73,9 @@ enum FdwScanPrivateIndex
* String describing join i.e. names of relations being joined and types
* of join, added when the scan is join
*/
- FdwScanPrivateRelations
+ FdwScanPrivateRelations,
+
+ FdwScanTupleIdParamIds
};
/*
@@ -95,7 +97,8 @@ enum FdwModifyPrivateIndex
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning,
/* Integer list of attribute numbers retrieved by RETURNING */
- FdwModifyPrivateRetrievedAttrs
+ FdwModifyPrivateRetrievedAttrs,
+ FdwModifyPrivateTidParams
};
/*
@@ -156,6 +159,8 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+
+ int *tid_params;
} PgFdwScanState;
/*
@@ -178,6 +183,7 @@ typedef struct PgFdwModifyState
/* info about parameters for prepared statement */
AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
+ int *tid_params;
int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */
@@ -293,9 +299,6 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
-static void postgresAddForeignUpdateTargets(Query *parsetree,
- RangeTblEntry *target_rte,
- Relation target_relation);
static List *postgresPlanForeignModify(PlannerInfo *root,
ModifyTable *plan,
Index resultRelation,
@@ -388,9 +391,11 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs);
+ List *retrieved_attrs,
+ int *tid_params);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ Oid tableoid,
ItemPointer tupleid,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
@@ -471,7 +476,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->EndForeignScan = postgresEndForeignScan;
/* Functions for updating foreign tables */
- routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
+ routine->AddForeignUpdateTargets = NULL;
routine->PlanForeignModify = postgresPlanForeignModify;
routine->BeginForeignModify = postgresBeginForeignModify;
routine->ExecForeignInsert = postgresExecForeignInsert;
@@ -595,6 +600,26 @@ postgresGetForeignRelSize(PlannerInfo *root,
&fpinfo->attrs_used);
}
+ /*
+ * ctid and tableoid are required for UPDATE and DELETE.
+ */
+ if (root->parse->commandType == CMD_UPDATE ||
+ root->parse->commandType == CMD_DELETE)
+ {
+ fpinfo->param_attrs =
+ bms_add_member(fpinfo->param_attrs,
+ SelfItemPointerAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
+
+ fpinfo->param_attrs =
+ bms_add_member(fpinfo->param_attrs,
+ TableOidAttributeNumber -
+ FirstLowInvalidHeapAttributeNumber);
+
+ fpinfo->attrs_used =
+ bms_add_members(fpinfo->attrs_used, fpinfo->param_attrs);
+ }
+
/*
* Compute the selectivity and cost of the local_conds, so we don't have
* to do it over again for each path. The best we can do for these
@@ -1116,6 +1141,61 @@ postgresGetForeignPaths(PlannerInfo *root,
}
}
+/*
+ * Select a PARAM_EXEC number to identify the given Var as a parameter for
+ * the current subquery, or for a nestloop's inner scan.
+ * If the Var already has a param in the current context, return that one.
+ */
+static int
+assign_param_for_var(PlannerInfo *root, Var *var)
+{
+ ListCell *ppl;
+ PlannerParamItem *pitem;
+ Index levelsup;
+
+ /* Find the query level the Var belongs to */
+ for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+ root = root->parent_root;
+
+ /* If there's already a matching PlannerParamItem there, just use it */
+ foreach(ppl, root->plan_params)
+ {
+ pitem = (PlannerParamItem *) lfirst(ppl);
+ if (IsA(pitem->item, Var))
+ {
+ Var *pvar = (Var *) pitem->item;
+
+ /*
+ * This comparison must match _equalVar(), except for ignoring
+ * varlevelsup. Note that _equalVar() ignores the location.
+ */
+ if (pvar->varno == var->varno &&
+ pvar->varattno == var->varattno &&
+ pvar->vartype == var->vartype &&
+ pvar->vartypmod == var->vartypmod &&
+ pvar->varcollid == var->varcollid &&
+ pvar->varnoold == var->varnoold &&
+ pvar->varoattno == var->varoattno)
+ return pitem->paramId;
+ }
+ }
+
+ /* Nope, so make a new one */
+ var = copyObject(var);
+ var->varlevelsup = 0;
+
+ pitem = makeNode(PlannerParamItem);
+ pitem->item = (Node *) var;
+ pitem->paramId = list_length(root->glob->paramExecTypes);
+ root->glob->paramExecTypes = lappend_oid(root->glob->paramExecTypes,
+ var->vartype);
+
+ root->plan_params = lappend(root->plan_params, pitem);
+
+ return pitem->paramId;
+}
+
+
/*
* postgresGetForeignPlan
* Create ForeignScan plan node which implements selected best path
@@ -1287,6 +1367,32 @@ postgresGetForeignPlan(PlannerInfo *root,
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name->data));
+ if (!bms_is_empty(fpinfo->param_attrs))
+ {
+ int *paramids = palloc(sizeof(int) * 2);
+ Var *v;
+
+ if (list_length(fdw_private) == 3)
+ fdw_private = lappend(fdw_private, makeString(""));
+
+ v = makeNode(Var);
+ v->varno = foreignrel->relid;
+ v->vartype = OIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = TableOidAttributeNumber;
+ paramids[0] = assign_param_for_var(root, v);
+
+ v = makeNode(Var);
+ v->varno = foreignrel->relid;
+ v->vartype = TIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = SelfItemPointerAttributeNumber;
+ paramids[1] = assign_param_for_var(root, v);
+
+ fdw_private = lappend(fdw_private, paramids);
+ }
/*
* Create the ForeignScan node for the given relation.
@@ -1368,6 +1474,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
FdwScanPrivateRetrievedAttrs);
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
FdwScanPrivateFetchSize));
+ if (list_length(fsplan->fdw_private) > FdwScanTupleIdParamIds)
+ fsstate->tid_params =
+ (int *) list_nth(fsplan->fdw_private, FdwScanTupleIdParamIds);
/* Create contexts for batches of tuples and per-tuple temp workspace. */
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1418,6 +1527,8 @@ postgresIterateForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ EState *estate = node->ss.ps.state;
+ HeapTuple tup;
/*
* If this is the first call after Begin or ReScan, we need to create the
@@ -1439,10 +1550,28 @@ postgresIterateForeignScan(ForeignScanState *node)
return ExecClearTuple(slot);
}
+ tup = fsstate->tuples[fsstate->next_tuple++];
+ if (fsstate->tid_params != NULL)
+ {
+ ParamExecData *prm;
+ ItemPointer itemp;
+
+ /* set toid */
+ prm = &(estate->es_param_exec_vals[fsstate->tid_params[0]]);
+ prm->value = ObjectIdGetDatum(tup->t_tableOid);
+ /* set ctid */
+ prm = &(estate->es_param_exec_vals[fsstate->tid_params[1]]);
+ itemp = (ItemPointer) palloc(sizeof(ItemPointerData));
+ ItemPointerSet(itemp,
+ ItemPointerGetBlockNumberNoCheck(&tup->t_self),
+ ItemPointerGetOffsetNumberNoCheck(&tup->t_self));
+ prm->value = PointerGetDatum(itemp);
+ }
+
/*
* Return the next tuple.
*/
- ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
+ ExecStoreTuple(tup,
slot,
InvalidBuffer,
false);
@@ -1530,41 +1659,41 @@ postgresEndForeignScan(ForeignScanState *node)
/* MemoryContexts will be deleted automatically. */
}
-/*
- * postgresAddForeignUpdateTargets
- * Add resjunk column(s) needed for update/delete on a foreign table
- */
-static void
-postgresAddForeignUpdateTargets(Query *parsetree,
- RangeTblEntry *target_rte,
- Relation target_relation)
+static int
+find_param_for_var(PlannerInfo *root, Var *var)
{
- Var *var;
- const char *attrname;
- TargetEntry *tle;
+ ListCell *ppl;
+ PlannerParamItem *pitem;
+ Index levelsup;
- /*
- * In postgres_fdw, what we need is the ctid, same as for a regular table.
- */
+ /* Find the query level the Var belongs to */
+ for (levelsup = var->varlevelsup; levelsup > 0; levelsup--)
+ root = root->parent_root;
- /* Make a Var representing the desired value */
- var = makeVar(parsetree->resultRelation,
- SelfItemPointerAttributeNumber,
- TIDOID,
- -1,
- InvalidOid,
- 0);
+ /* If there's already a matching PlannerParamItem there, just use it */
+ foreach(ppl, root->plan_params)
+ {
+ pitem = (PlannerParamItem *) lfirst(ppl);
+ if (IsA(pitem->item, Var))
+ {
+ Var *pvar = (Var *) pitem->item;
- /* Wrap it in a resjunk TLE with the right name ... */
- attrname = "ctid";
+ /*
+ * This comparison must match _equalVar(), except for ignoring
+ * varlevelsup. Note that _equalVar() ignores the location.
+ */
+ if (pvar->varno == var->varno &&
+ pvar->varattno == var->varattno &&
+ pvar->vartype == var->vartype &&
+ pvar->vartypmod == var->vartypmod &&
+ pvar->varcollid == var->varcollid &&
+ pvar->varnoold == var->varnoold &&
+ pvar->varoattno == var->varoattno)
+ return pitem->paramId;
+ }
+ }
- tle = makeTargetEntry((Expr *) var,
- list_length(parsetree->targetList) + 1,
- pstrdup(attrname),
- true);
-
- /* ... and add it to the query's targetlist */
- parsetree->targetList = lappend(parsetree->targetList, tle);
+ return -1;
}
/*
@@ -1585,6 +1714,7 @@ postgresPlanForeignModify(PlannerInfo *root,
List *returningList = NIL;
List *retrieved_attrs = NIL;
bool doNothing = false;
+ int *paramids = NULL;
initStringInfo(&sql);
@@ -1630,6 +1760,28 @@ postgresPlanForeignModify(PlannerInfo *root,
}
}
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ Var *v;
+
+ paramids = palloc(sizeof(int) * 2);
+ v = makeNode(Var);
+ v->varno = resultRelation;
+ v->vartype = OIDOID;
+ v->vartypmod = -1;
+ v->varcollid = InvalidOid;
+ v->varattno = TableOidAttributeNumber;
+ paramids[0] = find_param_for_var(root, v);
+ if (paramids[0] < 0)
+ elog(ERROR, "ERROR 1");
+
+ v->vartype = TIDOID;
+ v->varattno = SelfItemPointerAttributeNumber;
+ paramids[1] = find_param_for_var(root, v);
+ if (paramids[1] < 0)
+ elog(ERROR, "ERROR 2");
+ }
+
/*
* Extract the relevant RETURNING list if any.
*/
@@ -1679,10 +1831,11 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwModifyPrivateIndex, above.
*/
- return list_make4(makeString(sql.data),
+ return list_make5(makeString(sql.data),
targetAttrs,
makeInteger((retrieved_attrs != NIL)),
- retrieved_attrs);
+ retrieved_attrs,
+ paramids);
}
/*
@@ -1702,6 +1855,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
bool has_returning;
List *retrieved_attrs;
RangeTblEntry *rte;
+ int *tid_params;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
@@ -1719,6 +1873,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateHasReturning));
retrieved_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateRetrievedAttrs);
+ tid_params = (int *) list_nth(fdw_private, FdwModifyPrivateTidParams);
/* Find RTE. */
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex,
@@ -1733,7 +1888,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
query,
target_attrs,
has_returning,
- retrieved_attrs);
+ retrieved_attrs,
+ tid_params);
resultRelInfo->ri_FdwState = fmstate;
}
@@ -1758,7 +1914,7 @@ postgresExecForeignInsert(EState *estate,
prepare_foreign_modify(fmstate);
/* Convert parameters needed by prepared statement to text form */
- p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+ p_values = convert_prep_stmt_params(fmstate, InvalidOid, NULL, slot);
/*
* Execute the prepared statement.
@@ -1813,28 +1969,31 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- Datum datum;
- bool isNull;
+ Datum toiddatum, ctiddatum;
const char **p_values;
PGresult *res;
int n_rows;
+ int *tid_params = fmstate->tid_params;
+ ParamExecData *prm;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
+ Assert(tid_params);
+ /* Get the tableoid that was passed up as a exec param */
+ prm = &(estate->es_param_exec_vals[tid_params[0]]);
+ toiddatum = prm->value;
+
/* Get the ctid that was passed up as a resjunk column */
- datum = ExecGetJunkAttribute(planSlot,
- fmstate->ctidAttno,
- &isNull);
- /* shouldn't ever get a null result... */
- if (isNull)
- elog(ERROR, "ctid is NULL");
+ prm = &(estate->es_param_exec_vals[tid_params[1]]);
+ ctiddatum = prm->value;
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
- (ItemPointer) DatumGetPointer(datum),
- slot);
+ DatumGetObjectId(toiddatum),
+ (ItemPointer) DatumGetPointer(ctiddatum),
+ slot);
/*
* Execute the prepared statement.
@@ -1889,28 +2048,32 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- Datum datum;
- bool isNull;
+ Datum toiddatum, ctiddatum;
const char **p_values;
PGresult *res;
int n_rows;
+ int *tid_params = fmstate->tid_params;
+ ParamExecData *prm;
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
+ Assert(tid_params);
+
+ /* Get the tableoid that was passed up as a exec param */
+ prm = &(estate->es_param_exec_vals[tid_params[0]]);
+ toiddatum = prm->value;
+
/* Get the ctid that was passed up as a resjunk column */
- datum = ExecGetJunkAttribute(planSlot,
- fmstate->ctidAttno,
- &isNull);
- /* shouldn't ever get a null result... */
- if (isNull)
- elog(ERROR, "ctid is NULL");
+ prm = &(estate->es_param_exec_vals[tid_params[1]]);
+ ctiddatum = prm->value;
/* Convert parameters needed by prepared statement to text form */
p_values = convert_prep_stmt_params(fmstate,
- (ItemPointer) DatumGetPointer(datum),
- NULL);
+ DatumGetObjectId(toiddatum),
+ (ItemPointer) DatumGetPointer(ctiddatum),
+ NULL);
/*
* Execute the prepared statement.
@@ -2058,7 +2221,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
sql.data,
targetAttrs,
retrieved_attrs != NIL,
- retrieved_attrs);
+ retrieved_attrs,
+ NULL);
resultRelInfo->ri_FdwState = fmstate;
}
@@ -3286,7 +3450,8 @@ create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs)
+ List *retrieved_attrs,
+ int *tid_params)
{
PgFdwModifyState *fmstate;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -3333,7 +3498,7 @@ create_foreign_modify(EState *estate,
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
/* Prepare for output conversion of parameters used in prepared stmt. */
- n_params = list_length(fmstate->target_attrs) + 1;
+ n_params = list_length(fmstate->target_attrs) + 2;
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
fmstate->p_nums = 0;
@@ -3341,13 +3506,14 @@ create_foreign_modify(EState *estate,
{
Assert(subplan != NULL);
- /* Find the ctid resjunk column in the subplan's result */
- fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
- "ctid");
- if (!AttributeNumberIsValid(fmstate->ctidAttno))
- elog(ERROR, "could not find junk ctid column");
+ fmstate->tid_params = tid_params;
- /* First transmittable parameter will be ctid */
+ /* First transmittable parameter will be table oid */
+ getTypeOutputInfo(OIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+
+ /* Second transmittable parameter will be ctid */
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
fmstate->p_nums++;
@@ -3430,6 +3596,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
*/
static const char **
convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ Oid tableoid,
ItemPointer tupleid,
TupleTableSlot *slot)
{
@@ -3441,10 +3608,13 @@ convert_prep_stmt_params(PgFdwModifyState *fmstate,
p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
- /* 1st parameter should be ctid, if it's in use */
- if (tupleid != NULL)
+ /* First two parameters should be tableoid and ctid, if it's in use */
+ if (tableoid != InvalidOid)
{
/* don't need set_transmission_modes for TID output */
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+ ObjectIdGetDatum(tableoid));
+ pindex++;
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
@@ -5549,6 +5719,7 @@ make_tuple_from_result_row(PGresult *res,
bool *nulls;
ItemPointer ctid = NULL;
Oid oid = InvalidOid;
+ Oid toid = InvalidOid;
ConversionLocation errpos;
ErrorContextCallback errcallback;
MemoryContext oldcontext;
@@ -5642,6 +5813,17 @@ make_tuple_from_result_row(PGresult *res,
oid = DatumGetObjectId(datum);
}
}
+ else if (i == TableOidAttributeNumber)
+ {
+ /* table oid */
+ if (valstr != NULL)
+ {
+ Datum datum;
+
+ datum = DirectFunctionCall1(oidin, CStringGetDatum(valstr));
+ toid = DatumGetObjectId(datum);
+ }
+ }
errpos.cur_attno = 0;
j++;
@@ -5691,6 +5873,9 @@ make_tuple_from_result_row(PGresult *res,
if (OidIsValid(oid))
HeapTupleSetOid(tuple, oid);
+ if (OidIsValid(toid))
+ tuple->t_tableOid = toid;
+
/* Clean up */
MemoryContextReset(temp_context);
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a5d4011e8d..39e5581125 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -108,6 +108,8 @@ typedef struct PgFdwRelationInfo
* representing the relation.
*/
int relation_index;
+
+ Bitmapset *param_attrs; /* attrs required for modification */
} PgFdwRelationInfo;
/* in postgres_fdw.c */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index da7f52cab0..60a6fa849d 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1193,6 +1193,8 @@ typedef struct ScanState
Relation ss_currentRelation;
HeapScanDesc ss_currentScanDesc;
TupleTableSlot *ss_ScanTupleSlot;
+ int ntuple_infos;
+ Datum tuple_info[];
} ScanState;
/* ----------------