On 2015/05/11 8:50, Tom Lane wrote: > Etsuro Fujita <fujita.ets...@lab.ntt.co.jp> writes: >> [ EvalPlanQual-v6.patch ] > > I've started to study this in a little more detail, and I'm not terribly > happy with some of the API decisions in it.
Thanks for taking the time to review the patch! > In particular, I find the addition of "void *fdw_state" to ExecRowMark > to be pretty questionable. That does not seem like a good place to keep > random state. (I realize that WHERE CURRENT OF keeps some state in > ExecRowMark, but that's a crock not something to emulate.) ISTM that in > most scenarios, the state that LockForeignRow/FetchForeignRow would like > to get at is probably the FDW state associated with the ForeignScanState > that the tuple came from. Which this API doesn't help with particularly. > I wonder if we should instead add a "ScanState*" field and expect the > core code to set that up (ExecOpenScanRelation could do it with minor > API changes...). Sorry, I don't understand clearly what you mean, but that (the idea of expecting the core to set it up) sounds inconsistent with your comment on the earlier version of the API "BeginForeignFetch" [1]. > I'm also a bit tempted to pass the TIDs to LockForeignRow and > FetchForeignRow as Datums not ItemPointers. We have the Datum format > available already at the call sites, so this is free as far as the core > code is concerned, and would only cost another line or so for the FDWs. > This is by no means sufficient to allow FDWs to use some other type than > "tid" for row identifiers; but it would be a down payment on that problem, > and at least would avoid nailing the rowids-are-tids assumption into yet > another global API. That is a good idea. > Also, as I mentioned, I'd be a whole lot happier if we had a way to test > this... Attached is a postgres_fdw patch that I used for the testing. If you try it, edit postgresGetForeignRowMarkType as necessary. I have to confess that I did the testing only in the normal conditions by the patch. Sorry for the delay. I took a vacation until yesterday. Best regards, Etsuro Fujita [1] http://www.postgresql.org/message-id/14504.1428446...@sss.pgh.pa.us
*** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** *** 88,93 **** typedef struct PgFdwRelationInfo --- 88,95 ---- * * 1) SELECT statement text to be sent to the remote server * 2) Integer list of attribute numbers retrieved by the SELECT + * 3) SELECT statement text to be sent to the remote server + * 4) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: *************** *** 98,104 **** enum FdwScanPrivateIndex /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, /* Integer list of attribute numbers retrieved by the SELECT */ ! FdwScanPrivateRetrievedAttrs }; /* --- 100,110 ---- /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, /* Integer list of attribute numbers retrieved by the SELECT */ ! FdwScanPrivateRetrievedAttrs, ! /* SQL statement to execute remotely (as a String node) */ ! FdwScanPrivateSelectSql2, ! /* Integer list of attribute numbers retrieved by SELECT */ ! FdwScanPrivateRetrievedAttrs2 }; /* *************** *** 186,191 **** typedef struct PgFdwModifyState --- 192,223 ---- } PgFdwModifyState; /* + * Execution state for fetching/locking foreign rows. + */ + typedef struct PgFdwFetchState + { + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* for remote query execution */ + PGconn *conn; /* connection for the fetch */ + char *p_name; /* name of prepared statement, if created */ + + /* extracted fdw_private data */ + char *query; /* text of SELECT command */ + List *retrieved_attrs; /* attr numbers retrieved by SELECT */ + + /* info about parameters for prepared statement */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ + + HeapTuple locked_tuple; + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + } PgFdwFetchState; + + /* * Workspace for analyzing a foreign table. */ typedef struct PgFdwAnalyzeState *************** *** 276,281 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate, --- 308,320 ---- static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static int postgresIsForeignRelUpdatable(Relation rel); + static RowMarkType postgresGetForeignRowMarkType(LockClauseStrength strength); + static bool postgresLockForeignRow(EState *estate, + ExecRowMark *erm, + ItemPointer tupleid); + static HeapTuple postgresFetchForeignRow(EState *estate, + ExecRowMark *erm, + ItemPointer tupleid); static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); static void postgresExplainForeignModify(ModifyTableState *mtstate, *************** *** 306,320 **** static void get_remote_estimate(const char *sql, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); ! static void prepare_foreign_modify(PgFdwModifyState *fmstate); ! static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ! ItemPointer tupleid, ! TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, --- 345,370 ---- static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); + static List *create_foreign_fetch_info(PlannerInfo *root, + RelOptInfo *baserel, + RowMarkType markType); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); ! static char *setup_prep_stmt(PGconn *conn, char *query); ! static const char **convert_prep_stmt_params(ItemPointer tupleid, ! TupleTableSlot *slot, ! int p_nums, ! FmgrInfo *p_flinfo, ! List *target_attrs, ! MemoryContext temp_context); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); + static void init_foreign_fetch_state(EState *estate, + ExecRowMark *erm, + List *fdw_private, + int eflags); + static void finish_foreign_fetch_state(EState *estate, ExecRowMark *erm); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, *************** *** 358,363 **** postgres_fdw_handler(PG_FUNCTION_ARGS) --- 408,418 ---- routine->EndForeignModify = postgresEndForeignModify; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; + /* Functions for SELECT FOR UPDATE/SHARE row locking */ + routine->GetForeignRowMarkType = postgresGetForeignRowMarkType; + routine->LockForeignRow = postgresLockForeignRow; + routine->FetchForeignRow = postgresFetchForeignRow; + /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; *************** *** 746,751 **** postgresGetForeignPlan(PlannerInfo *root, --- 801,807 ---- PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; Index scan_relid = baserel->relid; List *fdw_private; + List *fdw_private2 = NIL; List *remote_conds = NIL; List *local_exprs = NIL; List *params_list = NIL; *************** *** 836,855 **** postgresGetForeignPlan(PlannerInfo *root, * complete information about, and (b) it wouldn't work anyway on * older remote servers. Likewise, we don't worry about NOWAIT. */ ! switch (rc->strength) { ! case LCS_NONE: ! /* No locking needed */ ! break; ! case LCS_FORKEYSHARE: ! case LCS_FORSHARE: ! appendStringInfoString(&sql, " FOR SHARE"); ! break; ! case LCS_FORNOKEYUPDATE: ! case LCS_FORUPDATE: ! appendStringInfoString(&sql, " FOR UPDATE"); ! break; } } } --- 892,917 ---- * complete information about, and (b) it wouldn't work anyway on * older remote servers. Likewise, we don't worry about NOWAIT. */ ! if (rc->markType == ROW_MARK_COPY) { ! switch (rc->strength) ! { ! case LCS_NONE: ! /* No locking needed */ ! break; ! case LCS_FORKEYSHARE: ! case LCS_FORSHARE: ! appendStringInfoString(&sql, " FOR SHARE"); ! break; ! case LCS_FORNOKEYUPDATE: ! case LCS_FORUPDATE: ! appendStringInfoString(&sql, " FOR UPDATE"); ! break; ! } } + else + fdw_private2 = create_foreign_fetch_info(root, baserel, + rc->markType); } } *************** *** 859,864 **** postgresGetForeignPlan(PlannerInfo *root, --- 921,928 ---- */ fdw_private = list_make2(makeString(sql.data), retrieved_attrs); + if (fdw_private2) + fdw_private = list_concat(fdw_private, fdw_private2); /* * Create the ForeignScan node from target list, local filtering *************** *** 886,891 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) --- 950,956 ---- EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; RangeTblEntry *rte; + ExecRowMark *erm; Oid userid; ForeignTable *table; ForeignServer *server; *************** *** 986,991 **** postgresBeginForeignScan(ForeignScanState *node, int eflags) --- 1051,1063 ---- fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); else fsstate->param_values = NULL; + + /* + * Initialize state for fetching/locking foreign rows if needed. + */ + erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true); + if (erm && erm->relation && erm->fdw_state == NULL) + init_foreign_fetch_state(estate, erm, fsplan->fdw_private, eflags); } /* *************** *** 1093,1099 **** postgresReScanForeignScan(ForeignScanState *node) --- 1165,1174 ---- static void postgresEndForeignScan(ForeignScanState *node) { + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + ExecRowMark *erm; /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ if (fsstate == NULL) *************** *** 1107,1112 **** postgresEndForeignScan(ForeignScanState *node) --- 1182,1194 ---- ReleaseConnection(fsstate->conn); fsstate->conn = NULL; + /* + * Finish state for fetching/locking foreign rows if needed. + */ + erm = ExecFindRowMark(estate, fsplan->scan.scanrelid, true); + if (erm && erm->relation && erm->fdw_state != NULL) + finish_foreign_fetch_state(estate, erm); + /* MemoryContexts will be deleted automatically. */ } *************** *** 1391,1400 **** postgresExecForeignInsert(EState *estate, /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! prepare_foreign_modify(fmstate); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params(fmstate, NULL, slot); /* * Execute the prepared statement, and check for success. --- 1473,1486 ---- /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params(NULL, slot, ! fmstate->p_nums, ! fmstate->p_flinfo, ! fmstate->target_attrs, ! fmstate->temp_cxt); /* * Execute the prepared statement, and check for success. *************** *** 1451,1457 **** postgresExecForeignUpdate(EState *estate, /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, --- 1537,1543 ---- /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, *************** *** 1462,1470 **** postgresExecForeignUpdate(EState *estate, elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params(fmstate, ! (ItemPointer) DatumGetPointer(datum), ! slot); /* * Execute the prepared statement, and check for success. --- 1548,1559 ---- elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum), ! slot, ! fmstate->p_nums, ! fmstate->p_flinfo, ! fmstate->target_attrs, ! fmstate->temp_cxt); /* * Execute the prepared statement, and check for success. *************** *** 1521,1527 **** postgresExecForeignDelete(EState *estate, /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! prepare_foreign_modify(fmstate); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, --- 1610,1616 ---- /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) ! fmstate->p_name = setup_prep_stmt(fmstate->conn, fmstate->query); /* Get the ctid that was passed up as a resjunk column */ datum = ExecGetJunkAttribute(planSlot, *************** *** 1532,1540 **** postgresExecForeignDelete(EState *estate, elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params(fmstate, ! (ItemPointer) DatumGetPointer(datum), ! NULL); /* * Execute the prepared statement, and check for success. --- 1621,1632 ---- elog(ERROR, "ctid is NULL"); /* Convert parameters needed by prepared statement to text form */ ! p_values = convert_prep_stmt_params((ItemPointer) DatumGetPointer(datum), ! NULL, ! fmstate->p_nums, ! fmstate->p_flinfo, ! fmstate->target_attrs, ! fmstate->temp_cxt); /* * Execute the prepared statement, and check for success. *************** *** 1656,1661 **** postgresIsForeignRelUpdatable(Relation rel) --- 1748,1927 ---- } /* + * postgresGetForeignRowMarkType + * Get rowmark type that we use for a given LockClauseStrength value. + */ + static RowMarkType + postgresGetForeignRowMarkType(LockClauseStrength strength) + { + /* return ROW_MARK_COPY; */ + switch (strength) + { + case LCS_NONE: + return ROW_MARK_REFERENCE; + case LCS_FORKEYSHARE: + return ROW_MARK_KEYSHARE; + case LCS_FORSHARE: + return ROW_MARK_SHARE; + case LCS_FORNOKEYUPDATE: + return ROW_MARK_NOKEYEXCLUSIVE; + case LCS_FORUPDATE: + return ROW_MARK_EXCLUSIVE; + } + return ROW_MARK_COPY; /* shouldn't happen */ + } + + /* + * postgresLockForeignRow + * Lock one tuple in a foreign table + */ + static bool + postgresLockForeignRow(EState *estate, + ExecRowMark *erm, + ItemPointer tupleid) + { + PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state; + const char **p_values; + PGresult *res; + HeapTuple tuple; + + ffstate->locked_tuple = NULL; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!ffstate->p_name) + ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(tupleid, NULL, + ffstate->p_nums, + ffstate->p_flinfo, + NIL, + ffstate->temp_cxt); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(ffstate->conn, + ffstate->p_name, + ffstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query); + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + /* Create the tuple */ + tuple = make_tuple_from_result_row(res, 0, + ffstate->rel, + ffstate->attinmeta, + ffstate->retrieved_attrs, + ffstate->temp_cxt); + tuple->t_self = *tupleid; + tuple->t_tableOid = erm->relid; + + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + + MemoryContextReset(ffstate->temp_cxt); + + /* Remember locked tuple for later processing */ + ffstate->locked_tuple = tuple; + + /* Got the lock successfully */ + return true; + } + + /* + * postgresFetchForeignRow + * Fetch one tuple from a foreign table + */ + static HeapTuple + postgresFetchForeignRow(EState *estate, + ExecRowMark *erm, + ItemPointer tupleid) + { + PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state; + const char **p_values; + PGresult *res; + HeapTuple tuple; + + if (RowMarkRequiresRowShareLock(erm->markType)) + { + Assert(ffstate->locked_tuple); + return ffstate->locked_tuple; + } + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!ffstate->p_name) + ffstate->p_name = setup_prep_stmt(ffstate->conn, ffstate->query); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(tupleid, NULL, + ffstate->p_nums, + ffstate->p_flinfo, + NIL, + ffstate->temp_cxt); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(ffstate->conn, + ffstate->p_name, + ffstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, ffstate->conn, true, ffstate->query); + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + /* Create the tuple */ + tuple = make_tuple_from_result_row(res, 0, + ffstate->rel, + ffstate->attinmeta, + ffstate->retrieved_attrs, + ffstate->temp_cxt); + tuple->t_self = *tupleid; + tuple->t_tableOid = erm->relid; + + PQclear(res); + res = NULL; + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + + MemoryContextReset(ffstate->temp_cxt); + + return tuple; + } + + /* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ *************** *** 1918,1923 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, --- 2184,2232 ---- } /* + * Create the FDW-private information for fetching/locking foreign rows. + */ + static List * + create_foreign_fetch_info(PlannerInfo *root, + RelOptInfo *baserel, + RowMarkType markType) + { + StringInfoData sql; + List *retrieved_attrs; + Bitmapset *attrs_used = NULL; + + /* + * Build the query string to be sent for execution. + */ + initStringInfo(&sql); + /* Add a whole-row var to attrs_used to retrieve all the columns. */ + attrs_used = bms_add_member(attrs_used, + 0 - FirstLowInvalidHeapAttributeNumber); + deparseSelectSql(&sql, root, baserel, attrs_used, &retrieved_attrs); + appendStringInfoString(&sql, " WHERE ctid = $1"); + + switch (markType) + { + case ROW_MARK_EXCLUSIVE: + case ROW_MARK_NOKEYEXCLUSIVE: + appendStringInfoString(&sql, " FOR UPDATE"); + break; + case ROW_MARK_SHARE: + case ROW_MARK_KEYSHARE: + appendStringInfoString(&sql, " FOR SHARE"); + break; + default: + break; + } + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwFetchPrivateIndex, above. + */ + return list_make2(makeString(sql.data), retrieved_attrs); + } + + /* * Create cursor for node's query with current parameter values. */ static void *************** *** 2154,2164 **** close_cursor(PGconn *conn, unsigned int cursor_number) } /* ! * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE */ ! static void ! prepare_foreign_modify(PgFdwModifyState *fmstate) { char prep_name[NAMEDATALEN]; char *p_name; --- 2463,2474 ---- } /* ! * setup_prep_stmt * Establish a prepared statement for execution of INSERT/UPDATE/DELETE + * or re-fetching tuples for EvalPlanQual rechecking */ ! static char * ! setup_prep_stmt(PGconn *conn, char *query) { char prep_name[NAMEDATALEN]; char *p_name; *************** *** 2166,2172 **** prepare_foreign_modify(PgFdwModifyState *fmstate) /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", ! GetPrepStmtNumber(fmstate->conn)); p_name = pstrdup(prep_name); /* --- 2476,2482 ---- /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", ! GetPrepStmtNumber(conn)); p_name = pstrdup(prep_name); /* *************** *** 2179,2196 **** prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ ! res = PQprepare(fmstate->conn, ! p_name, ! fmstate->query, ! 0, ! NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) ! pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); /* This action shows that the prepare has been done. */ ! fmstate->p_name = p_name; } /* --- 2489,2502 ---- * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ ! res = PQprepare(conn, p_name, query, 0, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) ! pgfdw_report_error(ERROR, res, conn, true, query); PQclear(res); /* This action shows that the prepare has been done. */ ! return p_name; } /* *************** *** 2203,2238 **** prepare_foreign_modify(PgFdwModifyState *fmstate) * Data is constructed in temp_cxt; caller should reset that after use. */ static const char ** ! convert_prep_stmt_params(PgFdwModifyState *fmstate, ! ItemPointer tupleid, ! TupleTableSlot *slot) { const char **p_values; int pindex = 0; MemoryContext oldcontext; ! oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); ! p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { /* don't need set_transmission_modes for TID output */ ! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } /* get following parameters from slot */ ! if (slot != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); ! foreach(lc, fmstate->target_attrs) { int attnum = lfirst_int(lc); Datum value; --- 2509,2547 ---- * Data is constructed in temp_cxt; caller should reset that after use. */ static const char ** ! convert_prep_stmt_params(ItemPointer tupleid, ! TupleTableSlot *slot, ! int p_nums, ! FmgrInfo *p_flinfo, ! List *target_attrs, ! MemoryContext temp_context) { const char **p_values; int pindex = 0; MemoryContext oldcontext; ! oldcontext = MemoryContextSwitchTo(temp_context); ! p_values = (const char **) palloc(sizeof(char *) * p_nums); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { /* don't need set_transmission_modes for TID output */ ! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } /* get following parameters from slot */ ! if (slot != NULL && target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); ! foreach(lc, target_attrs) { int attnum = lfirst_int(lc); Datum value; *************** *** 2242,2248 **** convert_prep_stmt_params(PgFdwModifyState *fmstate, if (isnull) p_values[pindex] = NULL; else ! p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], value); pindex++; } --- 2551,2557 ---- if (isnull) p_values[pindex] = NULL; else ! p_values[pindex] = OutputFunctionCall(&p_flinfo[pindex], value); pindex++; } *************** *** 2250,2256 **** convert_prep_stmt_params(PgFdwModifyState *fmstate, reset_transmission_modes(nestlevel); } ! Assert(pindex == fmstate->p_nums); MemoryContextSwitchTo(oldcontext); --- 2559,2565 ---- reset_transmission_modes(nestlevel); } ! Assert(pindex == p_nums); MemoryContextSwitchTo(oldcontext); *************** *** 2290,2295 **** store_returning_result(PgFdwModifyState *fmstate, --- 2599,2705 ---- } /* + * init_foreign_fetch_state + * Initialize an execution state for fetching/locking foreign rows + */ + static void + init_foreign_fetch_state(EState *estate, + ExecRowMark *erm, + List *fdw_private, + int eflags) + { + PgFdwFetchState *ffstate; + Relation rel = erm->relation; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + Oid typefnoid; + bool isvarlena; + + /* Begin constructing PgFdwFetchState. */ + ffstate = (PgFdwFetchState *) palloc0(sizeof(PgFdwFetchState)); + ffstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(erm->rti, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + ffstate->conn = GetConnection(server, user, true); + ffstate->p_name = NULL; /* prepared statement not made yet */ + + /* Deconstruct fdw_private data. */ + ffstate->query = strVal(list_nth(fdw_private, + FdwScanPrivateSelectSql2)); + ffstate->retrieved_attrs = (List *) list_nth(fdw_private, + FdwScanPrivateRetrievedAttrs2); + + /* Create context for per-tuple temp workspace. */ + ffstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + /* Prepare for input conversion of SELECT results. */ + ffstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel)); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + ffstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo)); + ffstate->p_nums = 0; + + /* Only one transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &ffstate->p_flinfo[ffstate->p_nums]); + ffstate->p_nums++; + + erm->fdw_state = ffstate; + } + + /* + * finish_foreign_fetch_state + * Finish an execution state for fetching/locking foreign rows + */ + static void + finish_foreign_fetch_state(EState *estate, ExecRowMark *erm) + { + PgFdwFetchState *ffstate = (PgFdwFetchState *) erm->fdw_state; + + /* If we created a prepared statement, destroy it */ + if (ffstate->p_name) + { + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", ffstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexec(ffstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, ffstate->conn, true, sql); + PQclear(res); + ffstate->p_name = NULL; + } + + /* Release remote connection */ + ReleaseConnection(ffstate->conn); + ffstate->conn = NULL; + } + + /* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers