On 2015/05/11 8:50, Tom Lane wrote:
> Etsuro Fujita <fujita.ets...@lab.ntt.co.jp> writes:
>> [ EvalPlanQual-v6.patch ]
> 
> I've started to study this in a little more detail, and I'm not terribly
> happy with some of the API decisions in it.

Thanks for taking the time to review the patch!

> In particular, I find the addition of "void *fdw_state" to ExecRowMark
> to be pretty questionable.  That does not seem like a good place to keep
> random state.  (I realize that WHERE CURRENT OF keeps some state in
> ExecRowMark, but that's a crock not something to emulate.)  ISTM that in
> most scenarios, the state that LockForeignRow/FetchForeignRow would like
> to get at is probably the FDW state associated with the ForeignScanState
> that the tuple came from.  Which this API doesn't help with particularly.
> I wonder if we should instead add a "ScanState*" field and expect the
> core code to set that up (ExecOpenScanRelation could do it with minor
> API changes...).

Sorry, I don't understand clearly what you mean, but that (the idea of
expecting the core to set it up) sounds inconsistent with your comment
on the earlier version of the API "BeginForeignFetch" [1].

> I'm also a bit tempted to pass the TIDs to LockForeignRow and
> FetchForeignRow as Datums not ItemPointers.  We have the Datum format
> available already at the call sites, so this is free as far as the core
> code is concerned, and would only cost another line or so for the FDWs.
> This is by no means sufficient to allow FDWs to use some other type than
> "tid" for row identifiers; but it would be a down payment on that problem,
> and at least would avoid nailing the rowids-are-tids assumption into yet
> another global API.

That is a good idea.

> Also, as I mentioned, I'd be a whole lot happier if we had a way to test
> this...

Attached is a postgres_fdw patch that I used for the testing.  If you
try it, edit postgresGetForeignRowMarkType as necessary.  I have to
confess that I did the testing only in the normal conditions by the patch.

Sorry for the delay.  I took a vacation until yesterday.

Best regards,
Etsuro Fujita

[1] http://www.postgresql.org/message-id/14504.1428446...@sss.pgh.pa.us
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 88,93 **** typedef struct PgFdwRelationInfo
--- 88,95 ----
   *
   * 1) SELECT statement text to be sent to the remote server
   * 2) Integer list of attribute numbers retrieved by the SELECT
+  * 3) SELECT statement text to be sent to the remote server
+  * 4) Integer list of attribute numbers retrieved by the SELECT
   *
   * These items are indexed with the enum FdwScanPrivateIndex, so an item
   * can be fetched with list_nth().  For example, to get the SELECT statement:
***************
*** 98,104 **** enum FdwScanPrivateIndex
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs
  };
  
  /*
--- 100,110 ----
  	/* SQL statement to execute remotely (as a String node) */
  	FdwScanPrivateSelectSql,
  	/* Integer list of attribute numbers retrieved by the SELECT */
! 	FdwScanPrivateRetrievedAttrs,
! 	/* SQL statement to execute remotely (as a String node) */
! 	FdwScanPrivateSelectSql2,
! 	/* Integer list of attribute numbers retrieved by SELECT */
! 	FdwScanPrivateRetrievedAttrs2
  };
  
  /*
***************
*** 186,191 **** typedef struct PgFdwModifyState
--- 192,223 ----
  } PgFdwModifyState;
  
  /*
+  * Execution state for fetching/locking foreign rows.
+  */
+ typedef struct PgFdwFetchState
+ {
+ 	Relation	rel;			/* relcache entry for the foreign table */
+ 	AttInMetadata *attinmeta;	/* attribute datatype conversion metadata */
+ 
+ 	/* for remote query execution */
+ 	PGconn	   *conn;			/* connection for the fetch */
+ 	char	   *p_name;			/* name of prepared statement, if created */
+ 
+ 	/* extracted fdw_private data */
+ 	char	   *query;			/* text of SELECT command */
+ 	List	   *retrieved_attrs;	/* attr numbers retrieved by SELECT */
+ 
+ 	/* info about parameters for prepared statement */
+ 	int			p_nums;			/* number of parameters to transmit */
+ 	FmgrInfo   *p_flinfo;		/* output conversion functions for them */
+ 
+ 	HeapTuple	locked_tuple;
+ 
+ 	/* working memory context */
+ 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
+ } PgFdwFetchState;
+ 
+ /*
   * Workspace for analyzing a foreign table.
   */
  typedef struct PgFdwAnalyzeState
***************
*** 276,281 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate,
--- 308,320 ----
  static void postgresEndForeignModify(EState *estate,
  						 ResultRelInfo *resultRelInfo);
  static int	postgresIsForeignRelUpdatable(Relation rel);
+ static RowMarkType postgresGetForeignRowMarkType(LockClauseStrength strength);
+ static bool postgresLockForeignRow(EState *estate,
+ 								   ExecRowMark *erm,
+ 								   ItemPointer tupleid);
+ static HeapTuple postgresFetchForeignRow(EState *estate,
+ 										 ExecRowMark *erm,
+ 										 ItemPointer tupleid);
  static void postgresExplainForeignScan(ForeignScanState *node,
  						   ExplainState *es);
  static void postgresExplainForeignModify(ModifyTableState *mtstate,
***************
*** 306,320 **** static void get_remote_estimate(const char *sql,
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static void prepare_foreign_modify(PgFdwModifyState *fmstate);
! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
--- 345,370 ----
  static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
  						  EquivalenceClass *ec, EquivalenceMember *em,
  						  void *arg);
+ static List *create_foreign_fetch_info(PlannerInfo *root,
+ 									   RelOptInfo *baserel,
+ 									   RowMarkType markType);
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
! static char *setup_prep_stmt(PGconn *conn, char *query);
! static const char **convert_prep_stmt_params(ItemPointer tupleid,
! 											 TupleTableSlot *slot,
! 											 int p_nums,
! 											 FmgrInfo *p_flinfo,
! 											 List *target_attrs,
! 											 MemoryContext temp_context);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
+ static void init_foreign_fetch_state(EState *estate,
+ 									 ExecRowMark *erm,
+ 									 List *fdw_private,
+ 									 int eflags);
+ static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
  							  HeapTuple *rows, int targrows,
  							  double *totalrows,
***************
*** 358,363 **** postgres_fdw_handler(PG_FUNCTION_ARGS)
--- 408,418 ----
  	routine->EndForeignModify = postgresEndForeignModify;
  	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
  
+ 	/* Functions for SELECT FOR UPDATE/SHARE row locking */
+ 	routine->GetForeignRowMarkType = postgresGetForeignRowMarkType;
+ 	routine->LockForeignRow = postgresLockForeignRow;
+ 	routine->FetchForeignRow = postgresFetchForeignRow;
+ 
  	/* Support functions for EXPLAIN */
  	routine->ExplainForeignScan = postgresExplainForeignScan;
  	routine->ExplainForeignModify = postgresExplainForeignModify;
***************
*** 746,751 **** postgresGetForeignPlan(PlannerInfo *root,
--- 801,807 ----
  	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
  	Index		scan_relid = baserel->relid;
  	List	   *fdw_private;
+ 	List	   *fdw_private2 = NIL;
  	List	   *remote_conds = NIL;
  	List	   *local_exprs = NIL;
  	List	   *params_list = NIL;
***************
*** 836,855 **** postgresGetForeignPlan(PlannerInfo *root,
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			switch (rc->strength)
  			{
! 				case LCS_NONE:
! 					/* No locking needed */
! 					break;
! 				case LCS_FORKEYSHARE:
! 				case LCS_FORSHARE:
! 					appendStringInfoString(&sql, " FOR SHARE");
! 					break;
! 				case LCS_FORNOKEYUPDATE:
! 				case LCS_FORUPDATE:
! 					appendStringInfoString(&sql, " FOR UPDATE");
! 					break;
  			}
  		}
  	}
  
--- 892,917 ----
  			 * complete information about, and (b) it wouldn't work anyway on
  			 * older remote servers.  Likewise, we don't worry about NOWAIT.
  			 */
! 			if (rc->markType == ROW_MARK_COPY)
  			{
! 				switch (rc->strength)
! 				{
! 					case LCS_NONE:
! 						/* No locking needed */
! 						break;
! 					case LCS_FORKEYSHARE:
! 					case LCS_FORSHARE:
! 						appendStringInfoString(&sql, " FOR SHARE");
! 						break;
! 					case LCS_FORNOKEYUPDATE:
! 					case LCS_FORUPDATE:
! 						appendStringInfoString(&sql, " FOR UPDATE");
! 						break;
! 				}
  			}
+ 			else
+ 				fdw_private2 = create_foreign_fetch_info(root, baserel,
+ 														 rc->markType);
  		}
  	}
  
***************
*** 859,864 **** postgresGetForeignPlan(PlannerInfo *root,
--- 921,928 ----
  	 */
  	fdw_private = list_make2(makeString(sql.data),
  							 retrieved_attrs);
+ 	if (fdw_private2)
+ 		fdw_private = list_concat(fdw_private, fdw_private2);
  
  	/*
  	 * Create the ForeignScan node from target list, local filtering
***************
*** 886,891 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 950,956 ----
  	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate;
  	RangeTblEntry *rte;
+ 	ExecRowMark *erm;
  	Oid			userid;
  	ForeignTable *table;
  	ForeignServer *server;
***************
*** 986,991 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1051,1063 ----
  		fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
  	else
  		fsstate->param_values = NULL;
+ 
+ 	/*
+ 	 * Initialize state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->fdw_state == NULL)
+ 		init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags);
  }
  
  /*
***************
*** 1093,1099 **** postgresReScanForeignScan(ForeignScanState *node)
--- 1165,1174 ----
  static void
  postgresEndForeignScan(ForeignScanState *node)
  {
+ 	ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ 	EState	   *estate = node->ss.ps.state;
  	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ 	ExecRowMark *erm;
  
  	/* if fsstate is NULL, we are in EXPLAIN; nothing to do */
  	if (fsstate == NULL)
***************
*** 1107,1112 **** postgresEndForeignScan(ForeignScanState *node)
--- 1182,1194 ----
  	ReleaseConnection(fsstate->conn);
  	fsstate->conn = NULL;
  
+ 	/*
+ 	 * Finish state for fetching/locking foreign rows if needed.
+ 	 */
+ 	erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true);
+ 	if (erm && erm->relation && erm->fdw_state != NULL)
+ 		finish_foreign_fetch_state(estate, erm);
+ 
  	/* MemoryContexts will be deleted automatically. */
  }
  
***************
*** 1391,1400 **** postgresExecForeignInsert(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1473,1486 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(NULL, slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1451,1457 **** postgresExecForeignUpdate(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1537,1543 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1462,1470 **** postgresExecForeignUpdate(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										slot);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1548,1559 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										slot,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1521,1527 **** postgresExecForeignDelete(EState *estate,
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		prepare_foreign_modify(fmstate);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
--- 1610,1616 ----
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
  	if (!fmstate->p_name)
! 		fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query);
  
  	/* Get the ctid that was passed up as a resjunk column */
  	datum = ExecGetJunkAttribute(planSlot,
***************
*** 1532,1540 **** postgresExecForeignDelete(EState *estate,
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params(fmstate,
! 										(ItemPointer) DatumGetPointer(datum),
! 										NULL);
  
  	/*
  	 * Execute the prepared statement, and check for success.
--- 1621,1632 ----
  		elog(ERROR, "ctid is NULL");
  
  	/* Convert parameters needed by prepared statement to text form */
! 	p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum),
! 										NULL,
! 										fmstate->p_nums,
! 										fmstate->p_flinfo,
! 										fmstate->target_attrs,
! 										fmstate->temp_cxt);
  
  	/*
  	 * Execute the prepared statement, and check for success.
***************
*** 1656,1661 **** postgresIsForeignRelUpdatable(Relation rel)
--- 1748,1927 ----
  }
  
  /*
+  * postgresGetForeignRowMarkType
+  *		Get rowmark type that we use for a given LockClauseStrength value.
+  */
+ static RowMarkType
+ postgresGetForeignRowMarkType(LockClauseStrength strength)
+ {
+ 	/* return ROW_MARK_COPY; */
+ 	switch (strength)
+ 	{
+ 		case LCS_NONE:
+ 			return ROW_MARK_REFERENCE;
+ 		case LCS_FORKEYSHARE:
+ 			return ROW_MARK_KEYSHARE;
+ 		case LCS_FORSHARE:
+ 			return ROW_MARK_SHARE;
+ 		case LCS_FORNOKEYUPDATE:
+ 			return ROW_MARK_NOKEYEXCLUSIVE;
+ 		case LCS_FORUPDATE:
+ 			return ROW_MARK_EXCLUSIVE;
+ 	}
+ 	return ROW_MARK_COPY;		/* shouldn't happen */
+ }
+ 
+ /*
+  * postgresLockForeignRow
+  *		Lock one tuple in a foreign table
+  */
+ static bool
+ postgresLockForeignRow(EState *estate,
+ 					   ExecRowMark *erm,
+ 					   ItemPointer tupleid)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	ffstate->locked_tuple = NULL;
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = PQexecPrepared(ffstate->conn,
+ 						 ffstate->p_name,
+ 						 ffstate->p_nums,
+ 						 p_values,
+ 						 NULL,
+ 						 NULL,
+ 						 0);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_self = *tupleid;
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	/* Remember locked tuple for later processing */
+ 	ffstate->locked_tuple = tuple;
+ 
+ 	/* Got the lock successfully */
+ 	return true;
+ }
+ 
+ /*
+  * postgresFetchForeignRow
+  *		Fetch one tuple from a foreign table
+  */
+ static HeapTuple
+ postgresFetchForeignRow(EState *estate,
+ 						ExecRowMark *erm,
+ 						ItemPointer tupleid)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ 	const char **p_values;
+ 	PGresult   *res;
+ 	HeapTuple	tuple;
+ 
+ 	if (RowMarkRequiresRowShareLock(erm->markType))
+ 	{
+ 		Assert(ffstate->locked_tuple);
+ 		return ffstate->locked_tuple;
+ 	}
+ 
+ 	/* Set up the prepared statement on the remote server, if we didn't yet */
+ 	if (!ffstate->p_name)
+ 		ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query);
+ 
+ 	/* Convert parameters needed by prepared statement to text form */
+ 	p_values = convert_prep_stmt_params(tupleid, NULL,
+ 										ffstate->p_nums,
+ 										ffstate->p_flinfo,
+ 										NIL,
+ 										ffstate->temp_cxt);
+ 
+ 	/*
+ 	 * Execute the prepared statement, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = PQexecPrepared(ffstate->conn,
+ 						 ffstate->p_name,
+ 						 ffstate->p_nums,
+ 						 p_values,
+ 						 NULL,
+ 						 NULL,
+ 						 0);
+ 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ 		pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query);
+ 
+ 	/* PGresult must be released before leaving this function. */
+ 	PG_TRY();
+ 	{
+ 		/* Create the tuple */
+ 		tuple = make_tuple_from_result_row(res, 0,
+ 										   ffstate->rel,
+ 										   ffstate->attinmeta,
+ 										   ffstate->retrieved_attrs,
+ 										   ffstate->temp_cxt);
+ 		tuple->t_self = *tupleid;
+ 		tuple->t_tableOid = erm->relid;
+ 
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ 	PG_CATCH();
+ 	{
+ 		if (res)
+ 			PQclear(res);
+ 		PG_RE_THROW();
+ 	}
+ 	PG_END_TRY();
+ 
+ 	MemoryContextReset(ffstate->temp_cxt);
+ 
+ 	return tuple;
+ }
+ 
+ /*
   * postgresExplainForeignScan
   *		Produce extra output for EXPLAIN of a ForeignScan on a foreign table
   */
***************
*** 1918,1923 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2184,2232 ----
  }
  
  /*
+  * Create the FDW-private information for fetching/locking foreign rows.
+  */
+ static List *
+ create_foreign_fetch_info(PlannerInfo *root,
+ 						  RelOptInfo *baserel,
+ 						  RowMarkType markType)
+ {
+ 	StringInfoData sql;
+ 	List	   *retrieved_attrs;
+ 	Bitmapset  *attrs_used = NULL;
+ 
+ 	/*
+ 	 * Build the query string to be sent for execution.
+ 	 */
+ 	initStringInfo(&sql);
+ 	/* Add a whole-row var to attrs_used to retrieve all the columns. */
+ 	attrs_used = bms_add_member(attrs_used,
+ 								0 - FirstLowInvalidHeapAttributeNumber);
+ 	deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs);
+ 	appendStringInfoString(&sql, " WHERE ctid = $1");
+ 
+ 	switch (markType)
+ 	{
+ 		case ROW_MARK_EXCLUSIVE:
+ 		case ROW_MARK_NOKEYEXCLUSIVE:
+ 			appendStringInfoString(&sql, " FOR UPDATE");
+ 			break;
+ 		case ROW_MARK_SHARE:
+ 		case ROW_MARK_KEYSHARE:
+ 			appendStringInfoString(&sql, " FOR SHARE");
+ 			break;
+ 		default:
+ 			break;
+ 	}
+ 
+ 	/*
+ 	 * Build the fdw_private list that will be available to the executor.
+ 	 * Items in the list must match enum FdwFetchPrivateIndex, above.
+ 	 */
+ 	return list_make2(makeString(sql.data), retrieved_attrs);
+ }
+ 
+ /*
   * Create cursor for node's query with current parameter values.
   */
  static void
***************
*** 2154,2164 **** close_cursor(PGconn *conn, unsigned int cursor_number)
  }
  
  /*
!  * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
! static void
! prepare_foreign_modify(PgFdwModifyState *fmstate)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
--- 2463,2474 ----
  }
  
  /*
!  * setup_prep_stmt
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+  *		or re-fetching tuples for EvalPlanQual rechecking
   */
! static char *
! setup_prep_stmt(PGconn *conn, char *query)
  {
  	char		prep_name[NAMEDATALEN];
  	char	   *p_name;
***************
*** 2166,2172 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(fmstate->conn));
  	p_name = pstrdup(prep_name);
  
  	/*
--- 2476,2482 ----
  
  	/* Construct name we'll use for the prepared statement. */
  	snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
! 			 GetPrepStmtNumber(conn));
  	p_name = pstrdup(prep_name);
  
  	/*
***************
*** 2179,2196 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(fmstate->conn,
! 					p_name,
! 					fmstate->query,
! 					0,
! 					NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	fmstate->p_name = p_name;
  }
  
  /*
--- 2489,2502 ----
  	 * We don't use a PG_TRY block here, so be careful not to throw error
  	 * without releasing the PGresult.
  	 */
! 	res = PQprepare(conn, p_name, query, 0, NULL);
  
  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 		pgfdw_report_error(ERROR, res, conn, true, query);
  	PQclear(res);
  
  	/* This action shows that the prepare has been done. */
! 	return p_name;
  }
  
  /*
***************
*** 2203,2238 **** prepare_foreign_modify(PgFdwModifyState *fmstate)
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(PgFdwModifyState *fmstate,
! 						 ItemPointer tupleid,
! 						 TupleTableSlot *slot)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
  
! 	p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && fmstate->target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, fmstate->target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
--- 2509,2547 ----
   * Data is constructed in temp_cxt; caller should reset that after use.
   */
  static const char **
! convert_prep_stmt_params(ItemPointer tupleid,
! 						 TupleTableSlot *slot,
! 						 int p_nums,
! 						 FmgrInfo *p_flinfo,
! 						 List *target_attrs,
! 						 MemoryContext temp_context)
  {
  	const char **p_values;
  	int			pindex = 0;
  	MemoryContext oldcontext;
  
! 	oldcontext = MemoryContextSwitchTo(temp_context);
  
! 	p_values = (const char **) palloc(sizeof(char *) * p_nums);
  
  	/* 1st parameter should be ctid, if it's in use */
  	if (tupleid != NULL)
  	{
  		/* don't need set_transmission_modes for TID output */
! 		p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  											  PointerGetDatum(tupleid));
  		pindex++;
  	}
  
  	/* get following parameters from slot */
! 	if (slot != NULL && target_attrs != NIL)
  	{
  		int			nestlevel;
  		ListCell   *lc;
  
  		nestlevel = set_transmission_modes();
  
! 		foreach(lc, target_attrs)
  		{
  			int			attnum = lfirst_int(lc);
  			Datum		value;
***************
*** 2242,2248 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
--- 2551,2557 ----
  			if (isnull)
  				p_values[pindex] = NULL;
  			else
! 				p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex],
  													  value);
  			pindex++;
  		}
***************
*** 2250,2256 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == fmstate->p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
--- 2559,2565 ----
  		reset_transmission_modes(nestlevel);
  	}
  
! 	Assert(pindex == p_nums);
  
  	MemoryContextSwitchTo(oldcontext);
  
***************
*** 2290,2295 **** store_returning_result(PgFdwModifyState *fmstate,
--- 2599,2705 ----
  }
  
  /*
+  * init_foreign_fetch_state
+  *		Initialize an execution state for fetching/locking foreign rows
+  */
+ static void
+ init_foreign_fetch_state(EState *estate,
+ 						 ExecRowMark *erm,
+ 						 List *fdw_private,
+ 						 int eflags)
+ {
+ 	PgFdwFetchState *ffstate;
+ 	Relation	rel = erm->relation;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	ForeignServer *server;
+ 	UserMapping *user;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 
+ 	/* Begin constructing PgFdwFetchState. */
+ 	ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState));
+ 	ffstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(erm->rti, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	server = GetForeignServer(table->serverid);
+ 	user = GetUserMapping(userid, server->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	ffstate->conn = GetConnection(server, user, true);
+ 	ffstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Deconstruct fdw_private data. */
+ 	ffstate->query = strVal(list_nth(fdw_private,
+ 									 FdwScanPrivateSelectSql2));
+ 	ffstate->retrieved_attrs = (List *) list_nth(fdw_private,
+ 												 FdwScanPrivateRetrievedAttrs2);
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_MINSIZE,
+ 											  ALLOCSET_SMALL_INITSIZE,
+ 											  ALLOCSET_SMALL_MAXSIZE);
+ 
+ 	/* Prepare for input conversion of SELECT results. */
+ 	ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
+ 	ffstate->p_nums = 0;
+ 
+ 	/* Only one transmittable parameter will be ctid */
+ 	getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 	fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]);
+ 	ffstate->p_nums++;
+ 
+ 	erm->fdw_state = ffstate;
+ }
+ 
+ /*
+  * finish_foreign_fetch_state
+  *		Finish an execution state for fetching/locking foreign rows
+  */
+ static void
+ finish_foreign_fetch_state(EState *estate, ExecRowMark *erm)
+ {
+ 	PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state;
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (ffstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = PQexec(ffstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, ffstate->conn, true, sql);
+ 		PQclear(res);
+ 		ffstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(ffstate->conn);
+ 	ffstate->conn = NULL;
+ }
+ 
+ /*
   * postgresAnalyzeForeignTable
   *		Test whether analyzing this foreign table is supported
   */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to