On 2015/03/13 0:50, Tom Lane wrote:
I think the real fix as far as postgres_fdw is concerned is in fact
to let it adopt a different ROW_MARK strategy, since it has meaningful
ctid values.  However, that is not a one-size-fits-all answer.

The tableoid problem can be fixed much less invasively as per the attached
patch.  I think that we should continue to assume that ctid is not
meaningful (and hence should read as (4294967295,0)) in FDWs that use
ROW_MARK_COPY, and press forward on fixing the locking issues for
postgres_fdw by letting it use ROW_MARK_REFERENCE or something close to
that.  That would also cause ctid to read properly for rows from
postgres_fdw.

To support ROW_MARK_REFERENCE on (postgres_fdw) foreign tables, I'd like to propose the following FDW APIs:

RowMarkType
GetForeignRowMarkType(Oid relid,
                      LockClauseStrength strength);

Decide which rowmark type to use for a foreign table (that has strength = LCS_NONE), ie, ROW_MARK_REFERENCE or ROW_MARK_COPY. (For now, the second argument takes LCS_NONE only, but is intended to be used for the possible extension to the other cases.) This is called during select_rowmark_type() in the planner.

void
BeginForeignFetch(EState *estate,
                  ExecRowMark *erm,
                  List *fdw_private,
                  int eflags);

Begin a remote fetch. This is called during InitPlan() in the executor.

HeapTuple
ExecForeignFetch(EState *estate,
                 ExecRowMark *erm,
                 ItemPointer tupleid);

Re-fetch the specified tuple. This is called during EvalPlanQualFetchRowMarks() in the executor.

void
EndForeignFetch(EState *estate,
                ExecRowMark *erm);

End a remote fetch.  This is called during ExecEndPlan() in the executor.

And I'd also like to propose to add a table/server option, row_mark_reference, to postgres_fdw. When a user sets the option to true for eg a foreign table, ROW_MARK_REFERENCE will be used for the table, not ROW_MARK_COPY.

Attached is a WIP patch, which contains no docs/regression tests.

It'd be appreciated if anyone could send back any comments earlier.

Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/option.c
--- b/contrib/postgres_fdw/option.c
***************
*** 105,111 **** postgres_fdw_validator(PG_FUNCTION_ARGS)
  		 * Validate option value, when we can do so without any context.
  		 */
  		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
! 			strcmp(def->defname, "updatable") == 0)
  		{
  			/* these accept only boolean values */
  			(void) defGetBoolean(def);
--- 105,112 ----
  		 * Validate option value, when we can do so without any context.
  		 */
  		if (strcmp(def->defname, "use_remote_estimate") == 0 ||
! 			strcmp(def->defname, "updatable") == 0 ||
! 			strcmp(def->defname, "row_mark_reference") == 0)
  		{
  			/* these accept only boolean values */
  			(void) defGetBoolean(def);
***************
*** 153,158 **** InitPgFdwOptions(void)
--- 154,162 ----
  		/* updatable is available on both server and table */
  		{"updatable", ForeignServerRelationId, false},
  		{"updatable", ForeignTableRelationId, false},
+ 		/* row_mark_reference is available on both server and table */
+ 		{"row_mark_reference", ForeignServerRelationId, false},
+ 		{"row_mark_reference", ForeignTableRelationId, false},
  		{NULL, InvalidOid, false}
  	};
  
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 124,129 **** enum FdwModifyPrivateIndex
--- 124,144 ----
  };
  
  /*
+  * Similarly, this enum describes what's kept in the fdw_private list for
+  * a PlanRowMark node referencing a postgres_fdw foreign table.  We store:
+  *
+  * 1) SELECT statement text to be sent to the remote server
+  * 2) Integer list of attribute numbers retrieved by SELECT
+  */
+ enum FdwFetchPrivateIndex
+ {
+ 	/* SQL statement to execute remotely (as a String node) */
+ 	FdwFetchPrivateSelectSql,
+ 	/* Integer list of attribute numbers retrieved by SELECT */
+ 	FdwFetchPrivateRetrievedAttrs
+ };
+ 
+ /*
   * Execution state of a foreign scan using postgres_fdw.
   */
  typedef struct PgFdwScanState
***************
*** 186,191 **** typedef struct PgFdwModifyState
--- 201,230 ----
  } PgFdwModifyState;
  
  /*
+  * Execution state of a foreign fetch operation.
+  */
+ typedef struct PgFdwFetchState
+ {
+ 	Relation	rel;			/* relcache entry for the foreign table */
+ 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
+ 
+ 	/* for remote query execution */
+ 	PGconn	   *conn;			/* connection for the fetch */
+ 	char	   *p_name;			/* name of prepared statement, if created */
+ 
+ 	/* extracted fdw_private data */
+ 	char	   *query;			/* text of SELECT command */
+ 	List	   *retrieved_attrs;	/* attr numbers retrieved by SELECT */
+ 
+ 	/* info about parameters for prepared statement */
+ 	int			p_nums;			/* number of parameters to transmit */
+ 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
+ 
+ 	/* working memory context */
+ 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+ } PgFdwFetchState;
+ 
+ /*
   * Workspace for analyzing a foreign table.
   */
  typedef struct PgFdwAnalyzeState
***************
*** 276,281 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 315,330 ----
  static void postgresEndForeignModify(EState *estate,
  						 ResultRelInfo *resultRelInfo);
  static int	postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(Oid relid,
+ 												 LockClauseStrength strength);
+ static void postgresBeginForeignFetch(EState *estate,
+ 									  ExecRowMark *erm,
+ 									  List *fdw_private,
+ 									  int eflags);
+ static HeapTuple postgresExecForeignFetch(EState *estate,
+ 										  ExecRowMark *erm,
+ 										  ItemPointer tupleid);
+ static void postgresEndForeignFetch(EState *estate, ExecRowMark *erm);
  static void postgresExplainForeignScan(ForeignScanState *node,
  						   ExplainState *es);
  static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 306,318 **** static void get_remote_estimate(const char *sql,
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
--- 355,373 ----
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ 									   RelOptInfo *baserel,
+ 									   RowMarkType markType);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! 											 TupleTableSlot *slot,
! 											 int p_nums,
! 											 FmgrInfo *p_flinfo,
! 											 List *target_attrs,
! 											 MemoryContext temp_context);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
***************
*** 358,363 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 413,424 ----
  	routine->EndForeignModify = postgresEndForeignModify;
  	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
  
+ 	/* Functions for EvalPlanQual rechecking */
+ 	routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ 	routine->BeginForeignFetch = postgresBeginForeignFetch;
+ 	routine->ExecForeignFetch = postgresExecForeignFetch;
+ 	routine->EndForeignFetch = postgresEndForeignFetch;
+ 
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 850,855 **** postgresGetForeignPlan(PlannerInfo *root,
--- 911,923 ----
  					appendStringInfoString(&sql, " FOR UPDATE");
  					break;
  			}
+ 
+ 			/*
+ 			 * Build the fdw_private list that will be available to
+ 			 * EvalPlanQual that re-fetches tuples from the foreign table.
+ 			 */
+ 			rc->fdw_private = create_foreign_fetch_info(root, baserel,
+ 														rc->markType);
  		}
  	}
  
***************
*** 1391,1400 **** postgresExecForeignInsert(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1459,1472 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(NULL, slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1451,1457 **** postgresExecForeignUpdate(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1523,1529 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1462,1470 **** postgresExecForeignUpdate(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1534,1545 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1521,1527 **** postgresExecForeignDelete(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1596,1602 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1532,1540 **** postgresExecForeignDelete(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										NULL);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1607,1618 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										NULL,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1656,1661 **** postgresIsForeignRelUpdatable(Relation rel)
--- 1734,1962 ----
  }
  
  /*
+  * postgresGetForeignRowMarkType
+  *		Get rowmark type that we use for a foreign table.
+  */
+ static RowMarkType
+ postgresGetForeignRowMarkType(Oid relid,
+ 							  LockClauseStrength strength)
+ {
+ 	bool		row_mark_reference;
+ 	ForeignTable *table;
+ 	ForeignServer *server;
+ 	ListCell   *lc;
+ 
+ 	Assert(strength == LCS_NONE);
+ 
+ 	/*
+ 	 * By default, we use ROW_MARK_COPY for all postgres_fdw foreign tables.
+ 	 * However, this can be overridden by a per-server setting, which in turn
+ 	 * can be overridden by a per-table setting.
+ 	 */
+ 	row_mark_reference = false;	/* false means ROW_MARK_COPY */
+ 
+ 	table = GetForeignTable(relid);
+ 	server = GetForeignServer(table->serverid);
+ 
+ 	foreach(lc, server->options)
+ 	{
+ 		DefElem    *def = (DefElem *) lfirst(lc);
+ 
+ 		if (strcmp(def->defname, "row_mark_reference") == 0)
+ 			row_mark_reference = defGetBoolean(def);
+ 	}
+ 	foreach(lc, table->options)
+ 	{
+ 		DefElem    *def = (DefElem *) lfirst(lc);
+ 
+ 		if (strcmp(def->defname, "row_mark_reference") == 0)
+ 			row_mark_reference = defGetBoolean(def);
+ 	}
+ 
+ 	return row_mark_reference ? ROW_MARK_REFERENCE : ROW_MARK_COPY;
+ }
+ 
+ /*
+  * postgresBeginForeignFetch
+  *		Begin a fetch operation on a foreign table
+  */
+ static void
+ postgresBeginForeignFetch(EState *estate,
+ 						  ExecRowMark *erm,
+ 						  List *fdw_private,
+ 						  int eflags)
+ {
+ 	PgFdwFetchState *ffstate;
+ 	Relation	rel = erm->relation;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	ForeignServer *server;
+ 	UserMapping *user;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 
+ 	/*
+ 	 * Do nothing in EXPLAIN (no ANALYZE) case.  erm->fdw_state stays NULL.
+ 	 */
+ 	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ 		return;
+ 
+ 	/* Begin constructing PgFdwFetchState. */
+ 	ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ 	ffstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(erm->rti, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	server = GetForeignServer(table->serverid);
+ 	user = GetUserMapping(userid, server->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	ffstate->conn = GetConnection(server, user, true);
+ 	ffstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Deconstruct fdw_private data. */
+ 	ffstate->query = strVal(list_nth(fdw_private,
+ 									 FdwFetchPrivateSelectSql));
+ 	ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ 											 FdwFetchPrivateRetrievedAttrs);
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_MINSIZE,
+ 											  ALLOCSET_SMALL_INITSIZE,
+ 											  ALLOCSET_SMALL_MAXSIZE);
+ 
+ 	/* Prepare for input conversion of SELECT results. */
+ 	ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ 	ffstate->p_nums = 0;
+ 
+ 	/* Only one transmittable parameter will be ctid */
+ 	getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 	fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ 	ffstate->p_nums++;
+ 
+ 	erm->fdw_state = ffstate;
+ }
+ 
+ /*
+  * postgresExecForeignFetch
+  *		Fetch the specified tuple from a foreign table
+  */
+ static HeapTuple
+ postgresExecForeignFetch(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 ItemPointer tupleid)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = PQexecPrepared(ffstate->conn,
+ 						 ffstate->p_name,
+ 						 ffstate->p_nums,
+ 						 p_values,
+ 						 NULL,
+ 						 NULL,
+ 						 0);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_self = *tupleid;
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	return tuple;
+ }
+ 
+ /*
+  * postgresEndForeignFetch
+  *		Finish a fetch operation on a foreign table
+  */
+ static void
+ postgresEndForeignFetch(EState *estate, ExecRowMark *erm)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ 
+ 	/* If ffstate is NULL, we are in EXPLAIN; nothing to do */
+ 	if (ffstate == NULL)
+ 		return;
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (ffstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = PQexec(ffstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ 		PQclear(res);
+ 		ffstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(ffstate->conn);
+ 	ffstate->conn = NULL;
+ }
+ 
+ /*
   * postgresExplainForeignScan
   *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
   */
***************
*** 1918,1923 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2219,2258 ----
  }
  
  /*
+  * Create the FDW-private information for a PlanRowMark node.
+  */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ 							RelOptInfo *baserel,
+ 							RowMarkType markType)
+ {
+ 	StringInfoData sql;
+ 	List	   *retrieved_attrs;
+ 	Bitmapset  *attrs_used = NULL;
+ 
+ 	Assert(markType == ROW_MARK_REFERENCE || markType == ROW_MARK_COPY);
+ 
+ 	if (markType == ROW_MARK_COPY)
+ 		return NIL;
+ 
+ 	/*
+ 	 * Build the query string to be sent for execution.
+ 	 */
+ 	initStringInfo(&sql);
+ 	/* Add a whole-row var to attrs_used to retrieve all the columns. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								0 - FirstLowInvalidHeapAttributeNumber);
+ 	deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ 	appendStringInfoString(&sql, " WHERE ctid = $1");
+ 
+ 	/*
+ 	 * Build the fdw_private list that will be available to the executor.
+ 	 * Items in the list must match enum FdwFetchPrivateIndex, above.
+ 	 */
+ 	return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+ 
+ /*
   * Create cursor for node's query with current parameter values.
   */
  static void
***************
*** 2154,2164 **** close_cursor(PGconn *conn, unsigned int cursor_number)
  }
  
  /*
!  * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
--- 2489,2500 ----
  }
  
  /*
!  * setup_prep_stmt
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+  *		or re-fetching tuples for EvalPlanQual rechecking
   */
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
***************
*** 2166,2172 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(fmstate->conn));
  	p_name = pstrdup(prep_name);
  
  	/*
--- 2502,2508 ----
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(conn));
  	p_name = pstrdup(prep_name);
  
  	/*
***************
*** 2179,2196 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(fmstate->conn,
! 					p_name,
! 					fmstate->query,
! 					0,
! 					NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	fmstate->p_name = p_name;
  }
  
  /*
--- 2515,2528 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(conn, p_name, query, 0, NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, conn, true, query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	return p_name;
  }
  
  /*
***************
*** 2203,2238 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
  
! 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && fmstate->target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, fmstate->target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
--- 2535,2573 ----
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(temp_context);
  
! 	p_values = (const char **) palloc(sizeof(char *) * p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
***************
*** 2242,2248 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
--- 2577,2583 ----
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
***************
*** 2250,2256 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == fmstate->p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
--- 2585,2591 ----
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 855,860 **** InitPlan(QueryDesc *queryDesc, int eflags)
--- 855,878 ----
  		erm->markType = rc->markType;
  		erm->waitPolicy = rc->waitPolicy;
  		ItemPointerSetInvalid(&(erm->curCtid));
+ 		erm->fdw_state = NULL;
+ 		if (relation && relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ 		{
+ 			/*
+ 			 * Tell the FDW to init fdw_state.
+ 			 */
+ 			FdwRoutine *fdwroutine;
+ 
+ 			Assert(rc->markType == ROW_MARK_REFERENCE);
+ 
+ 			fdwroutine = GetFdwRoutineForRelation(relation, false);
+ 			if (fdwroutine->BeginForeignFetch != NULL)
+ 				fdwroutine->BeginForeignFetch(estate,
+ 											  erm,
+ 											  rc->fdw_private,
+ 											  eflags);
+ 		}
+ 
  		estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
  	}
  
***************
*** 1098,1103 **** CheckValidResultRel(Relation resultRel, CmdType operation)
--- 1116,1123 ----
  static void
  CheckValidRowMarkRel(Relation rel, RowMarkType markType)
  {
+ 	FdwRoutine *fdwroutine;
+ 
  	switch (rel->rd_rel->relkind)
  	{
  		case RELKIND_RELATION:
***************
*** 1133,1143 **** CheckValidRowMarkRel(Relation rel, RowMarkType markType)
  							  RelationGetRelationName(rel))));
  			break;
  		case RELKIND_FOREIGN_TABLE:
! 			/* Should not get here; planner should have used ROW_MARK_COPY */
! 			ereport(ERROR,
! 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 					 errmsg("cannot lock rows in foreign table \"%s\"",
! 							RelationGetRelationName(rel))));
  			break;
  		default:
  			ereport(ERROR,
--- 1153,1171 ----
  							  RelationGetRelationName(rel))));
  			break;
  		case RELKIND_FOREIGN_TABLE:
! 			/* Okay only if the FDW supports it */
! 			fdwroutine = GetFdwRoutineForRelation(rel, false);
! 			if (fdwroutine->ExecForeignFetch == NULL)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! 						 errmsg("cannot lock rows in foreign table \"%s\"",
! 								RelationGetRelationName(rel))));
! 			/* Allow referencing the table, but not actual locking clauses */
! 			if (markType != ROW_MARK_REFERENCE)
! 				ereport(ERROR,
! 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
! 						 errmsg("cannot lock rows in foreign table \"%s\"",
! 								RelationGetRelationName(rel))));
  			break;
  		default:
  			ereport(ERROR,
***************
*** 1446,1452 **** ExecEndPlan(PlanState *planstate, EState *estate)
--- 1474,1494 ----
  		ExecRowMark *erm = (ExecRowMark *) lfirst(l);
  
  		if (erm->relation)
+ 		{
+ 			if (erm->relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ 			{
+ 				/*
+ 				 * let the FDW shut down
+ 				 */
+ 				FdwRoutine *fdwroutine;
+ 
+ 				fdwroutine = GetFdwRoutineForRelation(erm->relation, false);
+ 				if (fdwroutine->EndForeignFetch != NULL)
+ 					fdwroutine->EndForeignFetch(estate, erm);
+ 			}
+ 
  			heap_close(erm->relation, NoLock);
+ 		}
  	}
  }
  
***************
*** 2401,2407 **** EvalPlanQualFetchRowMarks(EPQState *epqstate)
  
  		if (erm->markType == ROW_MARK_REFERENCE)
  		{
! 			Buffer		buffer;
  
  			Assert(erm->relation != NULL);
  
--- 2443,2449 ----
  
  		if (erm->markType == ROW_MARK_REFERENCE)
  		{
! 			HeapTuple	copyTuple;
  
  			Assert(erm->relation != NULL);
  
***************
*** 2414,2428 **** EvalPlanQualFetchRowMarks(EPQState *epqstate)
  				continue;
  			tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
  
! 			/* okay, fetch the tuple */
! 			if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
! 							false, NULL))
! 				elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
  
! 			/* successful, copy and store tuple */
! 			EvalPlanQualSetTuple(epqstate, erm->rti,
! 								 heap_copytuple(&tuple));
! 			ReleaseBuffer(buffer);
  		}
  		else
  		{
--- 2456,2489 ----
  				continue;
  			tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
  
! 			if (erm->relation->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
! 			{
! 				/*
! 				 * let the FDW fetch the tuple
! 				 */
! 				FdwRoutine *fdwroutine;
  
! 				fdwroutine = GetFdwRoutineForRelation(erm->relation, false);
! 				copyTuple = fdwroutine->ExecForeignFetch(epqstate->estate,
! 														 erm,
! 														 &tuple.t_self);
! 			}
! 			else
! 			{
! 				Buffer		buffer;
! 
! 				/* okay, fetch the tuple */
! 				if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
! 								false, NULL))
! 					elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
! 
! 				/* successful, copy tuple */
! 				copyTuple = heap_copytuple(&tuple);
! 				ReleaseBuffer(buffer);
! 			}
! 
! 			/* store tuple */
! 			EvalPlanQualSetTuple(epqstate, erm->rti, copyTuple);
  		}
  		else
  		{
*** a/src/backend/nodes/copyfuncs.c
--- b/src/backend/nodes/copyfuncs.c
***************
*** 995,1000 **** _copyPlanRowMark(const PlanRowMark *from)
--- 995,1001 ----
  	COPY_SCALAR_FIELD(strength);
  	COPY_SCALAR_FIELD(waitPolicy);
  	COPY_SCALAR_FIELD(isParent);
+ 	COPY_NODE_FIELD(fdw_private);
  
  	return newnode;
  }
*** a/src/backend/nodes/outfuncs.c
--- b/src/backend/nodes/outfuncs.c
***************
*** 856,861 **** _outPlanRowMark(StringInfo str, const PlanRowMark *node)
--- 856,862 ----
  	WRITE_ENUM_FIELD(strength, LockClauseStrength);
  	WRITE_ENUM_FIELD(waitPolicy, LockWaitPolicy);
  	WRITE_BOOL_FIELD(isParent);
+ 	WRITE_NODE_FIELD(fdw_private);
  }
  
  static void
*** a/src/backend/optimizer/plan/planner.c
--- b/src/backend/optimizer/plan/planner.c
***************
*** 20,25 ****
--- 20,26 ----
  #include "access/htup_details.h"
  #include "executor/executor.h"
  #include "executor/nodeAgg.h"
+ #include "foreign/fdwapi.h"
  #include "miscadmin.h"
  #include "nodes/makefuncs.h"
  #ifdef OPTIMIZER_DEBUG
***************
*** 2229,2234 **** preprocess_rowmarks(PlannerInfo *root)
--- 2230,2236 ----
  		newrc->strength = rc->strength;
  		newrc->waitPolicy = rc->waitPolicy;
  		newrc->isParent = false;
+ 		newrc->fdw_private = NIL;
  
  		prowmarks = lappend(prowmarks, newrc);
  	}
***************
*** 2254,2259 **** preprocess_rowmarks(PlannerInfo *root)
--- 2256,2262 ----
  		newrc->strength = LCS_NONE;
  		newrc->waitPolicy = LockWaitBlock;		/* doesn't matter */
  		newrc->isParent = false;
+ 		newrc->fdw_private = NIL;
  
  		prowmarks = lappend(prowmarks, newrc);
  	}
***************
*** 2274,2280 **** select_rowmark_type(RangeTblEntry *rte, LockClauseStrength strength)
  	}
  	else if (rte->relkind == RELKIND_FOREIGN_TABLE)
  	{
! 		/* For now, we force all foreign tables to use ROW_MARK_COPY */
  		return ROW_MARK_COPY;
  	}
  	else
--- 2277,2298 ----
  	}
  	else if (rte->relkind == RELKIND_FOREIGN_TABLE)
  	{
! 		/* For a non-target, non-locked table, let the FDW select the type */
! 		if (strength == LCS_NONE)
! 		{
! 			FdwRoutine *fdwroutine;
! 			RowMarkType type;
! 
! 			fdwroutine = GetFdwRoutineByRelId(rte->relid);
! 			if (fdwroutine->GetForeignRowMarkType != NULL)
! 			{
! 				type = fdwroutine->GetForeignRowMarkType(rte->relid, strength);
! 				if (type != ROW_MARK_REFERENCE && type != ROW_MARK_COPY)
! 					elog(ERROR, "unrecognized RowMarkType %d", (int) type);
! 				return type;
! 			}
! 		}
! 		/* Otherwise, we force all foreign tables to use ROW_MARK_COPY */
  		return ROW_MARK_COPY;
  	}
  	else
*** a/src/backend/optimizer/prep/prepunion.c
--- b/src/backend/optimizer/prep/prepunion.c
***************
*** 1395,1400 **** expand_inherited_rtentry(PlannerInfo *root, RangeTblEntry *rte, Index rti)
--- 1395,1401 ----
  			newrc->strength = oldrc->strength;
  			newrc->waitPolicy = oldrc->waitPolicy;
  			newrc->isParent = false;
+ 			newrc->fdw_private = NIL;
  
  			/* Include child's rowmark type in parent's allMarkTypes */
  			oldrc->allMarkTypes |= newrc->allMarkTypes;
*** a/src/include/foreign/fdwapi.h
--- b/src/include/foreign/fdwapi.h
***************
*** 13,18 ****
--- 13,19 ----
  #define FDWAPI_H
  
  #include "nodes/execnodes.h"
+ #include "nodes/plannodes.h"
  #include "nodes/relation.h"
  
  /* To avoid including explain.h here, reference ExplainState thus: */
***************
*** 82,87 **** typedef void (*EndForeignModify_function) (EState *estate,
--- 83,102 ----
  
  typedef int (*IsForeignRelUpdatable_function) (Relation rel);
  
+ typedef RowMarkType (*GetForeignRowMarkType_function) (Oid relid,
+ 													   LockClauseStrength strength);
+ 
+ typedef void (*BeginForeignFetch_function) (EState *estate,
+ 											ExecRowMark *erm,
+ 											List *fdw_private,
+ 											int eflags);
+ 
+ typedef HeapTuple (*ExecForeignFetch_function) (EState *estate,
+ 												ExecRowMark *erm,
+ 												ItemPointer tupleid);
+ 
+ typedef void (*EndForeignFetch_function) (EState *estate, ExecRowMark *erm);
+ 
  typedef void (*ExplainForeignScan_function) (ForeignScanState *node,
  													struct ExplainState *es);
  
***************
*** 141,146 **** typedef struct FdwRoutine
--- 156,167 ----
  	EndForeignModify_function EndForeignModify;
  	IsForeignRelUpdatable_function IsForeignRelUpdatable;
  
+ 	/* Functions for EvalPlanQual rechecking */
+ 	GetForeignRowMarkType_function GetForeignRowMarkType;
+ 	BeginForeignFetch_function BeginForeignFetch;
+ 	ExecForeignFetch_function ExecForeignFetch;
+ 	EndForeignFetch_function EndForeignFetch;
+ 
  	/* Support functions for EXPLAIN */
  	ExplainForeignScan_function ExplainForeignScan;
  	ExplainForeignModify_function ExplainForeignModify;
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 420,426 **** typedef struct EState
   * subqueries-in-FROM will have an ExecRowMark with relation == NULL.  See
   * PlanRowMark for details about most of the fields.  In addition to fields
   * directly derived from PlanRowMark, we store curCtid, which is used by the
!  * WHERE CURRENT OF code.
   *
   * EState->es_rowMarks is a list of these structs.
   */
--- 420,427 ----
   * subqueries-in-FROM will have an ExecRowMark with relation == NULL.  See
   * PlanRowMark for details about most of the fields.  In addition to fields
   * directly derived from PlanRowMark, we store curCtid, which is used by the
!  * WHERE CURRENT OF code, and fdw_state, which is used by the FDW referencing
!  * a foreign table.
   *
   * EState->es_rowMarks is a list of these structs.
   */
***************
*** 434,439 **** typedef struct ExecRowMark
--- 435,441 ----
  	RowMarkType markType;		/* see enum in nodes/plannodes.h */
  	LockWaitPolicy waitPolicy;	/* NOWAIT and SKIP LOCKED */
  	ItemPointerData curCtid;	/* ctid of currently locked tuple, if any */
+ 	void	   *fdw_state;		/* foreign-data wrapper can keep state here */
  } ExecRowMark;
  
  /*
*** a/src/include/nodes/plannodes.h
--- b/src/include/nodes/plannodes.h
***************
*** 808,818 **** typedef struct Limit
   * FUNCTION scans) we have to copy the whole row value.  ROW_MARK_COPY is
   * pretty inefficient, since most of the time we'll never need the data; but
   * fortunately the case is not performance-critical in practice.  Note that
!  * we use ROW_MARK_COPY for non-target foreign tables, even if the FDW has a
!  * concept of rowid and so could theoretically support some form of
!  * ROW_MARK_REFERENCE.  Although copying the whole row value is inefficient,
!  * it's probably still faster than doing a second remote fetch, so it doesn't
!  * seem worth the extra complexity to permit ROW_MARK_REFERENCE.
   */
  typedef enum RowMarkType
  {
--- 808,816 ----
   * FUNCTION scans) we have to copy the whole row value.  ROW_MARK_COPY is
   * pretty inefficient, since most of the time we'll never need the data; but
   * fortunately the case is not performance-critical in practice.  Note that
!  * we use ROW_MARK_COPY for non-target foreign tables by default.  However,
!  * we allow the FDW to use ROW_MARK_REFERENCE for its foreign tables if the
!  * FDW has a concept of rowid and supports the ROW_MARK_REFERENCE operation.
   */
  typedef enum RowMarkType
  {
***************
*** 875,880 **** typedef struct PlanRowMark
--- 873,879 ----
  	LockClauseStrength strength;	/* LockingClause's strength, or LCS_NONE */
  	LockWaitPolicy waitPolicy;	/* NOWAIT and SKIP LOCKED options */
  	bool		isParent;		/* true if this is a "dummy" parent entry */
+ 	List	   *fdw_private;	/* private data for FDW, if foreign table */
  } PlanRowMark;
  
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to