Hi,
I looked at fixing this inconsistency by making all query list snapshot
handling work like EXPLAIN ANALYZE's code does. The only reason I went
this way was that implementing wCTEs on top of this behaviour is a lot
easier.
There were three places that needed fixing. The SPI and portal logic
changes were quite straightforward, but the SQL language function code
previously didn't know what query trees of the execution_state list
belonged to which query so there was no way to tell when we actually
needed to take a new snapshot. The approach I took was to change the
representation of the SQL function cache to a list of execution_state
lists, and grab a new snapshot between the lists.
The patch needs a bit more comments and some cleaning up, but I thought
I'd get your input first. Thoughts?
Regards,
Marko Tiikkaja
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
***************
*** 832,838 **** fmgr_sql_validator(PG_FUNCTION_ARGS)
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
querytree_list,
NULL, NULL);
}
else
--- 832,838 ----
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
llast(querytree_list),
NULL, NULL);
}
else
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 90,107 **** typedef struct
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! /* head of linked list of execution_state records */
! execution_state *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static execution_state *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
--- 90,107 ----
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
+ Snapshot snapshot;
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! List *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
***************
*** 123,183 **** static void sqlfunction_destroy(DestReceiver *self);
/* Set up the list of per-query execution_state records for a SQL function */
! static execution_state *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes = NULL;
! execution_state *preves = NULL;
execution_state *lasttages = NULL;
! ListCell *qtl_item;
! foreach(qtl_item, queryTree_list)
{
! Query *queryTree = (Query *) lfirst(qtl_item);
! Node *stmt;
! execution_state *newes;
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a SQL
function",
!
CreateCommandTag(stmt))));
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
! newes = (execution_state *) palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
! preves = newes;
}
/*
--- 123,200 ----
/* Set up the list of per-query execution_state records for a SQL function */
! static List *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes;
! execution_state *preves;
execution_state *lasttages = NULL;
! List *eslist;
! ListCell *lc1;
! ListCell *lc2;
! List *qtlist;
! Query *queryTree;
!
!
! eslist = NIL;
! foreach(lc1, queryTree_list)
{
! qtlist = (List *) lfirst(lc1);
! firstes = NULL;
! preves = NULL;
! foreach(lc2, qtlist)
! {
! Node *stmt;
! execution_state *newes;
! queryTree = (Query *) lfirst(lc2);
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0,
NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
SQL function",
!
CreateCommandTag(stmt))));
!
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
!
! newes = (execution_state *)
palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change
below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
!
! preves = newes;
! }
! eslist = lappend(eslist, firstes);
}
/*
***************
*** 210,216 **** init_execution_state(List *queryTree_list,
}
}
! return firstes;
}
/* Initialize the SQLFunctionCache for a SQL function */
--- 227,233 ----
}
}
! return eslist;
}
/* Initialize the SQLFunctionCache for a SQL function */
***************
*** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
queryTree_list,
NULL,
&fcache->junkFilter);
--- 359,365 ----
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
llast(queryTree_list),
NULL,
&fcache->junkFilter);
***************
*** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
- Snapshot snapshot;
DestReceiver *dest;
Assert(es->qd == NULL);
! /*
! * In a read-only function, use the surrounding query's snapshot;
! * otherwise take a new snapshot for each query. The snapshot should
! * include a fresh command ID so that all work to date in this
transaction
! * is visible.
! */
! if (fcache->readonly_func)
! snapshot = GetActiveSnapshot();
! else
! {
! CommandCounterIncrement();
! snapshot = GetTransactionSnapshot();
! }
/*
* If this query produces the function result, send its output to the
--- 391,401 ----
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
DestReceiver *dest;
Assert(es->qd == NULL);
! Assert(ActiveSnapshotSet());
/*
* If this query produces the function result, send its output to the
***************
*** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
! snapshot,
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
snapshot,
dest,
fcache->paramLI);
--- 419,432 ----
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
!
GetActiveSnapshot(),
!
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
GetActiveSnapshot(),
dest,
fcache->paramLI);
***************
*** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 622,629 ----
execution_state *es;
TupleTableSlot *slot;
Datum result;
+ List *eslist;
+ ListCell *eslc;
/*
* Switch to context in which the fcache lives. This ensures that
***************
*** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS)
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! es = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
--- 675,687 ----
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! eslist = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (linitial(eslist) && ((execution_state *) linitial(eslist))->status
== F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
***************
*** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS)
/*
* Find first unfinished query in function.
*/
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
/*
* Execute each command in the function one after another until we
either
--- 694,709 ----
/*
* Find first unfinished query in function.
*/
! foreach(eslc, eslist)
! {
! es = (execution_state *) lfirst(eslc);
!
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
!
! if (es)
! break;
! }
/*
* Execute each command in the function one after another until we
either
***************
*** 699,706 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 714,744 ----
bool completed;
if (es->status == F_EXEC_START)
+ {
+ if (!fcache->readonly_func)
+ {
+ /*
+ * In a read-only function, use the surrounding
query's snapshot;
+ * otherwise take a new snapshot if we don't
have one yet. The
+ * snapshot should include a fresh command ID
so that all work to
+ * date in this transaction is visible.
+ */
+ if (!fcache->snapshot)
+ {
+ CommandCounterIncrement();
+ fcache->snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(fcache->snapshot);
+ }
+ else
+ PushUpdatedSnapshot(fcache->snapshot);
+ }
+
postquel_start(es, fcache);
+ if (!fcache->readonly_func)
+ PopActiveSnapshot();
+ }
+
completed = postquel_getnext(es, fcache);
/*
***************
*** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 764,783 ----
if (es->status != F_EXEC_DONE)
break;
es = es->next;
+
+ if (!es)
+ {
+ eslc = lnext(eslc);
+ if (!eslc)
+ break;
+
+ es = (execution_state *) lfirst(eslc);
+
+ /* make sure we take a new snapshot for this query list
*/
+ Assert(fcache->snapshot != InvalidSnapshot);
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+ }
}
/*
***************
*** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 846,856 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
else
{
***************
*** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 877,887 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
}
else
***************
*** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 912,922 ----
/* Clear the tuplestore, but keep it for next time */
tuplestore_clear(fcache->tstore);
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
/*
***************
*** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS)
*/
if (es == NULL)
{
! es = fcache->func_state;
! while (es)
{
! es->status = F_EXEC_START;
! es = es->next;
}
}
--- 925,938 ----
*/
if (es == NULL)
{
! foreach(eslc, fcache->func_state)
{
! es = (execution_state *) lfirst(eslc);
! while (es)
! {
! es->status = F_EXEC_START;
! es = es->next;
! }
}
}
***************
*** 913,931 **** sql_exec_error_callback(void *arg)
{
execution_state *es;
int query_num;
- es = fcache->func_state;
query_num = 1;
! while (es)
{
! if (es->qd)
{
! errcontext("SQL function \"%s\" statement %d",
! fcache->fname, query_num);
! break;
}
- es = es->next;
- query_num++;
}
if (es == NULL)
{
--- 983,1006 ----
{
execution_state *es;
int query_num;
+ ListCell *lc;
query_num = 1;
!
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
! while (es)
{
! if (es->qd)
! {
! errcontext("SQL function \"%s\"
statement %d",
! fcache->fname,
query_num);
! break;
! }
! es = es->next;
! query_num++;
}
}
if (es == NULL)
{
***************
*** 956,973 **** static void
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es = fcache->func_state;
! while (es != NULL)
{
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
}
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
--- 1031,1059 ----
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es;
! ListCell *lc;
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
!
! while (es)
! {
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
! }
}
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 1769,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1769,1775 ----
SPITupleTable *my_tuptable = NULL;
int res = 0;
bool have_active_snap = ActiveSnapshotSet();
+ bool registered_snap = false;
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
***************
*** 1872,1879 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
}
else
{
!
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
}
else
--- 1873,1882 ----
}
else
{
! snapshot =
RegisterSnapshot(GetTransactionSnapshot());
! PushActiveSnapshot(snapshot);
pushed_active_snap = true;
+ registered_snap = true;
}
}
else
***************
*** 1966,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1969,1991 ----
*/
if (!read_only)
CommandCounterIncrement();
+
+ /*
+ * If we took a new snapshot for this query list, unregister it
and
+ * make sure we take a new one for the next list.
+ */
+ if (registered_snap)
+ {
+ UnregisterSnapshot(snapshot);
+ snapshot = InvalidSnapshot;
+ }
}
fail:
+ if (registered_snap)
+ UnregisterSnapshot(snapshot);
+
/* We no longer need the cached plan refcount, if any */
if (cplan)
ReleaseCachedPlan(cplan, true);
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 537,547 **** pg_parse_and_rewrite(const char *query_string, /* string to
execute */
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = list_concat(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
--- 537,547 ----
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = lappend(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 170,180 **** ProcessQuery(PlannedStmt *plan,
elog(DEBUG3, "ProcessQuery");
/*
- * Must always set a snapshot for plannable queries.
- */
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /*
* Create the QueryDesc object
*/
queryDesc = CreateQueryDesc(plan, sourceText,
--- 170,175 ----
***************
*** 234,241 **** ProcessQuery(PlannedStmt *plan,
/* Now take care of any queued AFTER triggers */
AfterTriggerEndQuery(queryDesc->estate);
- PopActiveSnapshot();
-
/*
* Now, we close down all the scans and free allocated resources.
*/
--- 229,234 ----
***************
*** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1213,1219 ----
char *completionTag)
{
ListCell *stmtlist_item;
+ Snapshot snapshot = InvalidSnapshot;
/*
* If the destination is DestRemoteExecute, change to DestNone. The
***************
*** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1256,1270 ----
if (log_executor_stats)
ResetUsage();
+ /* if no snapshot is set, grab a new one and register
it */
+ if (snapshot == InvalidSnapshot)
+ {
+ snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(snapshot);
+ }
+ else
+ PushUpdatedSnapshot(snapshot);
+
if (pstmt->canSetTag)
{
/* statement can set tag string */
***************
*** 1279,1284 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1282,1289 ----
altdest, NULL);
}
+ PopActiveSnapshot();
+
if (log_executor_stats)
ShowUsage("EXECUTOR STATISTICS");
***************
*** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1296,1320 ----
*
* These are assumed canSetTag if they're the only stmt
in the
* portal.
+ *
+ * NotifyStmt is the only utility statement allowed in
a list of
+ * rewritten queries, and it doesn't need a snapshot so
we don't
+ * need to worry about it. However, if the list has
only one
+ * statement and it's a utility statement, we are not
allowed to
+ * take a snapshot. See the first comment in
PortalRunUtility().
*/
if (list_length(portal->stmts) == 1)
+ {
+ Assert(snapshot == InvalidSnapshot);
+
PortalRunUtility(portal, stmt, isTopLevel,
dest, completionTag);
+ }
else
+ {
+ Assert(IsA(stmt, NotifyStmt));
+
PortalRunUtility(portal, stmt, isTopLevel,
altdest, NULL);
+ }
}
/*
***************
*** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1332,1340 ----
MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
}
+ if (snapshot != InvalidSnapshot)
+ UnregisterSnapshot(snapshot);
+
/*
* If a command completion tag was supplied, use it. Otherwise use the
* portal's commandTag as the default completion tag.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers