On 2015/03/13 0:50, Tom Lane wrote:
I think the real fix as far as postgres_fdw is concerned is in fact
to let it adopt a different ROW_MARK strategy, since it has meaningful
ctid values. However, that is not a one-size-fits-all answer.
The tableoid problem can be fixed much less invasively as per the attached
patch. I think that we should continue to assume that ctid is not
meaningful (and hence should read as (4294967295,0)) in FDWs that use
ROW_MARK_COPY, and press forward on fixing the locking issues for
postgres_fdw by letting it use ROW_MARK_REFERENCE or something close to
that. That would also cause ctid to read properly for rows from
postgres_fdw.
To support ROW_MARK_REFERENCE on (postgres_fdw) foreign tables, I'd like
to propose the following FDW APIs:
RowMarkType
GetForeignRowMarkType(Oid relid,
LockClauseStrength strength);
Decide which rowmark type to use for a foreign table (that has strength
= LCS_NONE), ie, ROW_MARK_REFERENCE or ROW_MARK_COPY. (For now, the
second argument takes LCS_NONE only, but is intended to be used for the
possible extension to the other cases.) This is called during
select_rowmark_type() in the planner.
void
BeginForeignFetch(EState *estate,
ExecRowMark *erm,
List *fdw_private,
int eflags);
Begin a remote fetch. This is called during InitPlan() in the executor.
HeapTuple
ExecForeignFetch(EState *estate,
ExecRowMark *erm,
ItemPointer tupleid);
Re-fetch the specified tuple. This is called during
EvalPlanQualFetchRowMarks() in the executor.
void
EndForeignFetch(EState *estate,
ExecRowMark *erm);
End a remote fetch. This is called during ExecEndPlan() in the executor.
And I'd also like to propose to add a table/server option,
row_mark_reference, to postgres_fdw. When a user sets the option to
true for eg a foreign table, ROW_MARK_REFERENCE will be used for the
table, not ROW_MARK_COPY.
Attached is a WIP patch, which contains no docs/regression tests.
It'd be appreciated if anyone could send back any comments earlier.
Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/option.c
--- b/contrib/postgres_fdw/option.c
***************
*** 105,111 **** postgres_fdw_validator(PG_FUNCTION_ARGS)
* Validate option value, when we can do so without any context.
*/
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
! strcmp(def->defname, "updatable") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
--- 105,112 ----
* Validate option value, when we can do so without any context.
*/
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
! strcmp(def->defname, "updatable") == 0 ||
! strcmp(def->defname, "row_mark_reference") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
***************
*** 153,158 **** InitPgFdwOptions(void)
--- 154,162 ----
/* updatable is available on both server and table */
{"updatable", ForeignServerRelationId, false},
{"updatable", ForeignTableRelationId, false},
+ /* row_mark_reference is available on both server and table */
+ {"row_mark_reference", ForeignServerRelationId, false},
+ {"row_mark_reference", ForeignTableRelationId, false},
{NULL, InvalidOid, false}
};
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 124,129 **** enum FdwModifyPrivateIndex
--- 124,144 ----
};
/*
+ * Similarly, this enum describes what's kept in the fdw_private list for
+ * a PlanRowMark node referencing a postgres_fdw foreign table. We store:
+ *
+ * 1) SELECT statement text to be sent to the remote server
+ * 2) Integer list of attribute numbers retrieved by SELECT
+ */
+ enum FdwFetchPrivateIndex
+ {
+ /* SQL statement to execute remotely (as a String node) */
+ FdwFetchPrivateSelectSql,
+ /* Integer list of attribute numbers retrieved by SELECT */
+ FdwFetchPrivateRetrievedAttrs
+ };
+
+ /*
* Execution state of a foreign scan using postgres_fdw.
*/
typedef struct PgFdwScanState
***************
*** 186,191 **** typedef struct PgFdwModifyState
--- 201,230 ----
} PgFdwModifyState;
/*
+ * Execution state of a foreign fetch operation.
+ */
+ typedef struct PgFdwFetchState
+ {
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the fetch */
+ char *p_name; /* name of prepared statement, if created */
+
+ /* extracted fdw_private data */
+ char *query; /* text of SELECT command */
+ List *retrieved_attrs; /* attr numbers retrieved by SELECT */
+
+ /* info about parameters for prepared statement */
+ int p_nums; /* number of parameters to transmit */
+ FmgrInfo *p_flinfo; /* output conversion functions for them */
+
+ /* working memory context */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+ } PgFdwFetchState;
+
+ /*
* Workspace for analyzing a foreign table.
*/
typedef struct PgFdwAnalyzeState
***************
*** 276,281 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 315,330 ----
static void postgresEndForeignModify(EState *estate,
ResultRelInfo *resultRelInfo);
static int postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(Oid relid,
+ LockClauseStrength strength);
+ static void postgresBeginForeignFetch(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags);
+ static HeapTuple postgresExecForeignFetch(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid);
+ static void postgresEndForeignFetch(EState *estate, ExecRowMark *erm);
static void postgresExplainForeignScan(ForeignScanState *node,
ExplainState *es);
static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 306,318 **** static void get_remote_estimate(const char *sql,
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
--- 355,373 ----
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
***************
*** 358,363 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 413,424 ----
routine->EndForeignModify = postgresEndForeignModify;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
+ /* Functions for EvalPlanQual rechecking */
+ routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ routine->BeginForeignFetch = postgresBeginForeignFetch;
+ routine->ExecForeignFetch = postgresExecForeignFetch;
+ routine->EndForeignFetch = postgresEndForeignFetch;
+
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 850,855 **** postgresGetForeignPlan(PlannerInfo *root,
--- 911,923 ----
appendStringInfoString(&sql, " FOR UPDATE");
break;
}
+
+ /*
+ * Build the fdw_private list that will be available to
+ * EvalPlanQual that re-fetches tuples from the foreign table.
+ */
+ rc->fdw_private = create_foreign_fetch_info(root, baserel,
+ rc->markType);
}
}
***************
*** 1391,1400 **** postgresExecForeignInsert(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
* Execute the prepared statement, and check for success.
--- 1459,1472 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(NULL, slot,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1451,1457 **** postgresExecForeignUpdate(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
--- 1523,1529 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
***************
*** 1462,1470 **** postgresExecForeignUpdate(EState *estate,
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate,
! (ItemPointer) DatumGetPointer(datum),
! slot);
/*
* Execute the prepared statement, and check for success.
--- 1534,1545 ----
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! slot,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1521,1527 **** postgresExecForeignDelete(EState *estate,
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! prepare_foreign_modify(fmstate);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
--- 1596,1602 ----
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
/* Get the ctid that was passed up as a resjunk column */
datum = ExecGetJunkAttribute(planSlot,
***************
*** 1532,1540 **** postgresExecForeignDelete(EState *estate,
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params(fmstate,
! (ItemPointer) DatumGetPointer(datum),
! NULL);
/*
* Execute the prepared statement, and check for success.
--- 1607,1618 ----
elog(ERROR, "ctid is NULL");
/* Convert parameters needed by prepared statement to text form */
! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! NULL,
! fmstate->p_nums,
! fmstate->p_flinfo,
! fmstate->target_attrs,
! fmstate->temp_cxt);
/*
* Execute the prepared statement, and check for success.
***************
*** 1656,1661 **** postgresIsForeignRelUpdatable(Relation rel)
--- 1734,1962 ----
}
/*
+ * postgresGetForeignRowMarkType
+ * Get rowmark type that we use for a foreign table.
+ */
+ static RowMarkType
+ postgresGetForeignRowMarkType(Oid relid,
+ LockClauseStrength strength)
+ {
+ bool row_mark_reference;
+ ForeignTable *table;
+ ForeignServer *server;
+ ListCell *lc;
+
+ Assert(strength == LCS_NONE);
+
+ /*
+ * By default, we use ROW_MARK_COPY for all postgres_fdw foreign tables.
+ * However, this can be overridden by a per-server setting, which in turn
+ * can be overridden by a per-table setting.
+ */
+ row_mark_reference = false; /* false means ROW_MARK_COPY */
+
+ table = GetForeignTable(relid);
+ server = GetForeignServer(table->serverid);
+
+ foreach(lc, server->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "row_mark_reference") == 0)
+ row_mark_reference = defGetBoolean(def);
+ }
+ foreach(lc, table->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "row_mark_reference") == 0)
+ row_mark_reference = defGetBoolean(def);
+ }
+
+ return row_mark_reference ? ROW_MARK_REFERENCE : ROW_MARK_COPY;
+ }
+
+ /*
+ * postgresBeginForeignFetch
+ * Begin a fetch operation on a foreign table
+ */
+ static void
+ postgresBeginForeignFetch(EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags)
+ {
+ PgFdwFetchState *ffstate;
+ Relation rel = erm->relation;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ Oid typefnoid;
+ bool isvarlena;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. erm->fdw_state stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ /* Begin constructing PgFdwFetchState. */
+ ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ ffstate->rel = rel;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(erm->rti, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(RelationGetRelid(rel));
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(userid, server->serverid);
+
+ /* Open connection; report that we'll create a prepared statement. */
+ ffstate->conn = GetConnection(server, user, true);
+ ffstate->p_name = NULL; /* prepared statement not made yet */
+
+ /* Deconstruct fdw_private data. */
+ ffstate->query = strVal(list_nth(fdw_private,
+ FdwFetchPrivateSelectSql));
+ ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ FdwFetchPrivateRetrievedAttrs);
+
+ /* Create context for per-tuple temp workspace. */
+ ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Prepare for input conversion of SELECT results. */
+ ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+
+ /* Prepare for output conversion of parameters used in prepared stmt. */
+ ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ ffstate->p_nums = 0;
+
+ /* Only one transmittable parameter will be ctid */
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ ffstate->p_nums++;
+
+ erm->fdw_state = ffstate;
+ }
+
+ /*
+ * postgresExecForeignFetch
+ * Fetch the specified tuple from a foreign table
+ */
+ static HeapTuple
+ postgresExecForeignFetch(EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ const char **p_values;
+ PGresult *res;
+ HeapTuple tuple;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!ffstate->p_name)
+ ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(tupleid, NULL,
+ ffstate->p_nums,
+ ffstate->p_flinfo,
+ NIL,
+ ffstate->temp_cxt);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(ffstate->conn,
+ ffstate->p_name,
+ ffstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ /* Create the tuple */
+ tuple = make_tuple_from_result_row(res, 0,
+ ffstate->rel,
+ ffstate->attinmeta,
+ ffstate->retrieved_attrs,
+ ffstate->temp_cxt);
+ tuple->t_self = *tupleid;
+ tuple->t_tableOid = erm->relid;
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MemoryContextReset(ffstate->temp_cxt);
+
+ return tuple;
+ }
+
+ /*
+ * postgresEndForeignFetch
+ * Finish a fetch operation on a foreign table
+ */
+ static void
+ postgresEndForeignFetch(EState *estate, ExecRowMark *erm)
+ {
+ PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+
+ /* If ffstate is NULL, we are in EXPLAIN; nothing to do */
+ if (ffstate == NULL)
+ return;
+
+ /* If we created a prepared statement, destroy it */
+ if (ffstate->p_name)
+ {
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(ffstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ PQclear(res);
+ ffstate->p_name = NULL;
+ }
+
+ /* Release remote connection */
+ ReleaseConnection(ffstate->conn);
+ ffstate->conn = NULL;
+ }
+
+ /*
* postgresExplainForeignScan
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
*/
***************
*** 1918,1923 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2219,2258 ----
}
/*
+ * Create the FDW-private information for a PlanRowMark node.
+ */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ RelOptInfo *baserel,
+ RowMarkType markType)
+ {
+ StringInfoData sql;
+ List *retrieved_attrs;
+ Bitmapset *attrs_used = NULL;
+
+ Assert(markType == ROW_MARK_REFERENCE || markType == ROW_MARK_COPY);
+
+ if (markType == ROW_MARK_COPY)
+ return NIL;
+
+ /*
+ * Build the query string to be sent for execution.
+ */
+ initStringInfo(&sql);
+ /* Add a whole-row var to attrs_used to retrieve all the columns. */
+ attrs_used = bms_add_member(attrs_used,
+ 0 - FirstLowInvalidHeapAttributeNumber);
+ deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ appendStringInfoString(&sql, " WHERE ctid = $1");
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwFetchPrivateIndex, above.
+ */
+ return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+
+ /*
* Create cursor for node's query with current parameter values.
*/
static void
***************
*** 2154,2164 **** close_cursor(PGconn *conn, unsigned int cursor_number)
}
/*
! * prepare_foreign_modify
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
*/
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
{
char prep_name[NAMEDATALEN];
char *p_name;
--- 2489,2500 ----
}
/*
! * setup_prep_stmt
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+ * or re-fetching tuples for EvalPlanQual rechecking
*/
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
{
char prep_name[NAMEDATALEN];
char *p_name;
***************
*** 2166,2172 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(fmstate->conn));
p_name = pstrdup(prep_name);
/*
--- 2502,2508 ----
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! GetPrepStmtNumber(conn));
p_name = pstrdup(prep_name);
/*
***************
*** 2179,2196 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = PQprepare(fmstate->conn,
! p_name,
! fmstate->query,
! 0,
! NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
/* This action shows that the prepare has been done. */
! fmstate->p_name = p_name;
}
/*
--- 2515,2528 ----
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
! res = PQprepare(conn, p_name, query, 0, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
! pgfdw_report_error(ERROR, res, conn, true, query);
PQclear(res);
/* This action shows that the prepare has been done. */
! return p_name;
}
/*
***************
*** 2203,2238 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! ItemPointer tupleid,
! TupleTableSlot *slot)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
! p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && fmstate->target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, fmstate->target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
--- 2535,2573 ----
* Data is constructed in temp_cxt; caller should reset that after use.
*/
static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! TupleTableSlot *slot,
! int p_nums,
! FmgrInfo *p_flinfo,
! List *target_attrs,
! MemoryContext temp_context)
{
const char **p_values;
int pindex = 0;
MemoryContext oldcontext;
! oldcontext = MemoryContextSwitchTo(temp_context);
! p_values = (const char **) palloc(sizeof(char *) * p_nums);
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
/* don't need set_transmission_modes for TID output */
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
/* get following parameters from slot */
! if (slot != NULL && target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
! foreach(lc, target_attrs)
{
int attnum = lfirst_int(lc);
Datum value;
***************
*** 2242,2248 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
value);
pindex++;
}
--- 2577,2583 ----
if (isnull)
p_values[pindex] = NULL;
else
! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
value);
pindex++;
}
***************
*** 2250,2256 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
reset_transmission_modes(nestlevel);
}
! Assert(pindex == fmstate->p_nums);
MemoryContextSwitchTo(oldcontext);
--- 2585,2591 ----
reset_transmission_modes(nestlevel);
}
! Assert(pindex == p_nums);
MemoryContextSwitchTo(oldcontext);
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 855,860 **** InitPlan(QueryDesc *queryDesc, int eflags)
--- 855,878 ----
erm->markType = rc->markType;
erm->waitPolicy = rc->waitPolicy;
ItemPointerSetInvalid(&(erm->curCtid));
+ erm->fdw_state = NULL;
+ if (relation && relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /*
+ * Tell the FDW to init fdw_state.
+ */
+ FdwRoutine *fdwroutine;
+
+ Assert(rc->markType == ROW_MARK_REFERENCE);
+
+ fdwroutine = GetFdwRoutineForRelation(relation, false);
+ if (fdwroutine->BeginForeignFetch != NULL)
+ fdwroutine->BeginForeignFetch(estate,
+ erm,
+ rc->fdw_private,
+ eflags);
+ }
+
estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
}
***************
*** 1098,1103 **** CheckValidResultRel(Relation resultRel, CmdType operation)
--- 1116,1123 ----
static void
CheckValidRowMarkRel(Relation rel, RowMarkType markType)
{
+ FdwRoutine *fdwroutine;
+
switch (rel->rd_rel->relkind)
{
case RELKIND_RELATION:
***************
*** 1133,1143 **** CheckValidRowMarkRel(Relation rel, RowMarkType markType)
RelationGetRelationName(rel))));
break;
case RELKIND_FOREIGN_TABLE:
! /* Should not get here; planner should have used ROW_MARK_COPY */
! ereport(ERROR,
! (errcode(ERRCODE_WRONG_OBJECT_TYPE),
! errmsg("cannot lock rows in foreign table \"%s\"",
! RelationGetRelationName(rel))));
break;
default:
ereport(ERROR,
--- 1153,1171 ----
RelationGetRelationName(rel))));
break;
case RELKIND_FOREIGN_TABLE:
! /* Okay only if the FDW supports it */
! fdwroutine = GetFdwRoutineForRelation(rel, false);
! if (fdwroutine->ExecForeignFetch == NULL)
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot lock rows in foreign table \"%s\"",
! RelationGetRelationName(rel))));
! /* Allow referencing the table, but not actual locking clauses */
! if (markType != ROW_MARK_REFERENCE)
! ereport(ERROR,
! (errcode(ERRCODE_WRONG_OBJECT_TYPE),
! errmsg("cannot lock rows in foreign table \"%s\"",
! RelationGetRelationName(rel))));
break;
default:
ereport(ERROR,
***************
*** 1446,1452 **** ExecEndPlan(PlanState *planstate, EState *estate)
--- 1474,1494 ----
ExecRowMark *erm = (ExecRowMark *) lfirst(l);
if (erm->relation)
+ {
+ if (erm->relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /*
+ * let the FDW shut down
+ */
+ FdwRoutine *fdwroutine;
+
+ fdwroutine = GetFdwRoutineForRelation(erm->relation, false);
+ if (fdwroutine->EndForeignFetch != NULL)
+ fdwroutine->EndForeignFetch(estate, erm);
+ }
+
heap_close(erm->relation, NoLock);
+ }
}
}
***************
*** 2401,2407 **** EvalPlanQualFetchRowMarks(EPQState *epqstate)
if (erm->markType == ROW_MARK_REFERENCE)
{
! Buffer buffer;
Assert(erm->relation != NULL);
--- 2443,2449 ----
if (erm->markType == ROW_MARK_REFERENCE)
{
! HeapTuple copyTuple;
Assert(erm->relation != NULL);
***************
*** 2414,2428 **** EvalPlanQualFetchRowMarks(EPQState *epqstate)
continue;
tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
! /* okay, fetch the tuple */
! if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
! false, NULL))
! elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
! /* successful, copy and store tuple */
! EvalPlanQualSetTuple(epqstate, erm->rti,
! heap_copytuple(&tuple));
! ReleaseBuffer(buffer);
}
else
{
--- 2456,2489 ----
continue;
tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
! if (erm->relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
! {
! /*
! * let the FDW fetch the tuple
! */
! FdwRoutine *fdwroutine;
! fdwroutine = GetFdwRoutineForRelation(erm->relation, false);
! copyTuple = fdwroutine->ExecForeignFetch(epqstate->estate,
! erm,
! &tuple.t_self);
! }
! else
! {
! Buffer buffer;
!
! /* okay, fetch the tuple */
! if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
! false, NULL))
! elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
!
! /* successful, copy tuple */
! copyTuple = heap_copytuple(&tuple);
! ReleaseBuffer(buffer);
! }
!
! /* store tuple */
! EvalPlanQualSetTuple(epqstate, erm->rti, copyTuple);
}
else
{
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
***************
*** 995,1000 **** _copyPlanRowMark(const PlanRowMark *from)
--- 995,1001 ----
COPY_SCALAR_FIELD(strength);
COPY_SCALAR_FIELD(waitPolicy);
COPY_SCALAR_FIELD(isParent);
+ COPY_NODE_FIELD(fdw_private);
return newnode;
}
*** a/src/backend/nodes/outfuncs.c
--- b/src/backend/nodes/outfuncs.c
***************
*** 856,861 **** _outPlanRowMark(StringInfo str, const PlanRowMark *node)
--- 856,862 ----
WRITE_ENUM_FIELD(strength, LockClauseStrength);
WRITE_ENUM_FIELD(waitPolicy, LockWaitPolicy);
WRITE_BOOL_FIELD(isParent);
+ WRITE_NODE_FIELD(fdw_private);
}
static void
*** a/src/backend/optimizer/plan/planner.c
--- b/src/backend/optimizer/plan/planner.c
***************
*** 20,25 ****
--- 20,26 ----
#include "access/htup_details.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
+ #include "foreign/fdwapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#ifdef OPTIMIZER_DEBUG
***************
*** 2229,2234 **** preprocess_rowmarks(PlannerInfo *root)
--- 2230,2236 ----
newrc->strength = rc->strength;
newrc->waitPolicy = rc->waitPolicy;
newrc->isParent = false;
+ newrc->fdw_private = NIL;
prowmarks = lappend(prowmarks, newrc);
}
***************
*** 2254,2259 **** preprocess_rowmarks(PlannerInfo *root)
--- 2256,2262 ----
newrc->strength = LCS_NONE;
newrc->waitPolicy = LockWaitBlock; /* doesn't matter */
newrc->isParent = false;
+ newrc->fdw_private = NIL;
prowmarks = lappend(prowmarks, newrc);
}
***************
*** 2274,2280 **** select_rowmark_type(RangeTblEntry *rte, LockClauseStrength strength)
}
else if (rte->relkind == RELKIND_FOREIGN_TABLE)
{
! /* For now, we force all foreign tables to use ROW_MARK_COPY */
return ROW_MARK_COPY;
}
else
--- 2277,2298 ----
}
else if (rte->relkind == RELKIND_FOREIGN_TABLE)
{
! /* For a non-target, non-locked table, let the FDW select the type */
! if (strength == LCS_NONE)
! {
! FdwRoutine *fdwroutine;
! RowMarkType type;
!
! fdwroutine = GetFdwRoutineByRelId(rte->relid);
! if (fdwroutine->GetForeignRowMarkType != NULL)
! {
! type = fdwroutine->GetForeignRowMarkType(rte->relid, strength);
! if (type != ROW_MARK_REFERENCE && type != ROW_MARK_COPY)
! elog(ERROR, "unrecognized RowMarkType %d", (int) type);
! return type;
! }
! }
! /* Otherwise, we force all foreign tables to use ROW_MARK_COPY */
return ROW_MARK_COPY;
}
else
*** a/src/backend/optimizer/prep/prepunion.c
--- b/src/backend/optimizer/prep/prepunion.c
***************
*** 1395,1400 **** expand_inherited_rtentry(PlannerInfo *root, RangeTblEntry *rte, Index rti)
--- 1395,1401 ----
newrc->strength = oldrc->strength;
newrc->waitPolicy = oldrc->waitPolicy;
newrc->isParent = false;
+ newrc->fdw_private = NIL;
/* Include child's rowmark type in parent's allMarkTypes */
oldrc->allMarkTypes |= newrc->allMarkTypes;
*** a/src/include/foreign/fdwapi.h
--- b/src/include/foreign/fdwapi.h
***************
*** 13,18 ****
--- 13,19 ----
#define FDWAPI_H
#include "nodes/execnodes.h"
+ #include "nodes/plannodes.h"
#include "nodes/relation.h"
/* To avoid including explain.h here, reference ExplainState thus: */
***************
*** 82,87 **** typedef void (*EndForeignModify_function) (EState *estate,
--- 83,102 ----
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
+ typedef RowMarkType (*GetForeignRowMarkType_function) (Oid relid,
+ LockClauseStrength strength);
+
+ typedef void (*BeginForeignFetch_function) (EState *estate,
+ ExecRowMark *erm,
+ List *fdw_private,
+ int eflags);
+
+ typedef HeapTuple (*ExecForeignFetch_function) (EState *estate,
+ ExecRowMark *erm,
+ ItemPointer tupleid);
+
+ typedef void (*EndForeignFetch_function) (EState *estate, ExecRowMark *erm);
+
typedef void (*ExplainForeignScan_function) (ForeignScanState *node,
struct ExplainState *es);
***************
*** 141,146 **** typedef struct FdwRoutine
--- 156,167 ----
EndForeignModify_function EndForeignModify;
IsForeignRelUpdatable_function IsForeignRelUpdatable;
+ /* Functions for EvalPlanQual rechecking */
+ GetForeignRowMarkType_function GetForeignRowMarkType;
+ BeginForeignFetch_function BeginForeignFetch;
+ ExecForeignFetch_function ExecForeignFetch;
+ EndForeignFetch_function EndForeignFetch;
+
/* Support functions for EXPLAIN */
ExplainForeignScan_function ExplainForeignScan;
ExplainForeignModify_function ExplainForeignModify;
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 420,426 **** typedef struct EState
* subqueries-in-FROM will have an ExecRowMark with relation == NULL. See
* PlanRowMark for details about most of the fields. In addition to fields
* directly derived from PlanRowMark, we store curCtid, which is used by the
! * WHERE CURRENT OF code.
*
* EState->es_rowMarks is a list of these structs.
*/
--- 420,427 ----
* subqueries-in-FROM will have an ExecRowMark with relation == NULL. See
* PlanRowMark for details about most of the fields. In addition to fields
* directly derived from PlanRowMark, we store curCtid, which is used by the
! * WHERE CURRENT OF code, and fdw_state, which is used by the FDW referencing
! * a foreign table.
*
* EState->es_rowMarks is a list of these structs.
*/
***************
*** 434,439 **** typedef struct ExecRowMark
--- 435,441 ----
RowMarkType markType; /* see enum in nodes/plannodes.h */
LockWaitPolicy waitPolicy; /* NOWAIT and SKIP LOCKED */
ItemPointerData curCtid; /* ctid of currently locked tuple, if any */
+ void *fdw_state; /* foreign-data wrapper can keep state here */
} ExecRowMark;
/*
*** a/src/include/nodes/plannodes.h
--- b/src/include/nodes/plannodes.h
***************
*** 808,818 **** typedef struct Limit
* FUNCTION scans) we have to copy the whole row value. ROW_MARK_COPY is
* pretty inefficient, since most of the time we'll never need the data; but
* fortunately the case is not performance-critical in practice. Note that
! * we use ROW_MARK_COPY for non-target foreign tables, even if the FDW has a
! * concept of rowid and so could theoretically support some form of
! * ROW_MARK_REFERENCE. Although copying the whole row value is inefficient,
! * it's probably still faster than doing a second remote fetch, so it doesn't
! * seem worth the extra complexity to permit ROW_MARK_REFERENCE.
*/
typedef enum RowMarkType
{
--- 808,816 ----
* FUNCTION scans) we have to copy the whole row value. ROW_MARK_COPY is
* pretty inefficient, since most of the time we'll never need the data; but
* fortunately the case is not performance-critical in practice. Note that
! * we use ROW_MARK_COPY for non-target foreign tables by default. However,
! * we allow the FDW to use ROW_MARK_REFERENCE for its foreign tables if the
! * FDW has a concept of rowid and supports the ROW_MARK_REFERENCE operation.
*/
typedef enum RowMarkType
{
***************
*** 875,880 **** typedef struct PlanRowMark
--- 873,879 ----
LockClauseStrength strength; /* LockingClause's strength, or LCS_NONE */
LockWaitPolicy waitPolicy; /* NOWAIT and SKIP LOCKED options */
bool isParent; /* true if this is a "dummy" parent entry */
+ List *fdw_private; /* private data for FDW, if foreign table */
} PlanRowMark;
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers