On 2010-10-04 6:19 AM, Steve Singer wrote:
I also noticed that functions.c is now generating a warning that should be
easy to clean up.
functions.c: In function 'sql_exec_error_callback':
functions.c:989: warning: 'es' may be used uninitialized in this function
functions.c: In function 'fmgr_sql':
functions.c:712: warning: 'es' is used uninitialized in this function
Those didn't look like actual bugs to me, but fixed in the attached
patch. Thanks.
Currently pg_parse_and_rewrite() returns all Query nodes in one huge list.
That's not acceptable for this patch since that list is already missing the
information we need: when should we take a new snapshot? So the patch breaks
the API of pg_parse_and_rewrite() to return a list of lists instead, but I'm
not convinced that's a bright idea since third party code might use it, so I
suggested adding a new function. Then again, third party code can't use
pg_parse_and_rewrite() any way if/when the wCTE patch goes in.
Is there any third party code in particular that your thinking of? I don't
see anything that says pg_parse_and_rewrite is part of a stable server
side API (in contrast to SPI or something an third party index access method
or custom data-type would call).
Nope. I think I grepped contrib/ at some point and none of those were
using pg_parse_and_rewrite() so this is all just speculation. And yes,
it's not really part of any stable API but breaking third party modules
needlessly is not something I want to do. However, in this case there
is no way to avoid breaking them.
My primary concern is that any module using pg_parse_and_rewrite() will
more or less silently break once this patch goes in no matter what we
do. If we leave pg_parse_and_rewrite as is, they will do the wrong
thing (grab a new snapshot for every rewrite product). The problem
might not be noticeable (no reports of EXPLAIN ANALYZE behaving
differently for several years), but once the wCTE patch gets in, it will
be. If we modify pg_parse_and_rewrite like the patch does, modules
start behaving weirdly. So what I'm suggesting is:
- Deprecate pg_parse_and_rewrite(). I have no idea how the project
has done this in the past, but grepping the source code for
"deprecated" suggests that we just remove the function.
- Introduce a new function, specifically designed for SQL functions.
Both callers of pg_parse_and_rewrite (init_sql_fcache and
fmgr_sql_validator) call check_sql_fn_retval after
pg_parse_and_rewrite so I think we could merge those into the new
function.
Does anyone have any concerns about this? Better ideas?
Regards,
Marko Tiikkaja
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
***************
*** 832,838 **** fmgr_sql_validator(PG_FUNCTION_ARGS)
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
querytree_list,
NULL, NULL);
}
else
--- 832,838 ----
proc->proargtypes.values,
proc->pronargs);
(void) check_sql_fn_retval(funcoid, proc->prorettype,
!
llast(querytree_list),
NULL, NULL);
}
else
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 90,107 **** typedef struct
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! /* head of linked list of execution_state records */
! execution_state *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static execution_state *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
--- 90,107 ----
ParamListInfo paramLI; /* Param list representing current args
*/
Tuplestorestate *tstore; /* where we accumulate result tuples */
+ Snapshot snapshot;
JunkFilter *junkFilter; /* will be NULL if function returns
VOID */
! List *func_state;
} SQLFunctionCache;
typedef SQLFunctionCache *SQLFunctionCachePtr;
/* non-export function prototypes */
! static List *init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK);
static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
***************
*** 123,183 **** static void sqlfunction_destroy(DestReceiver *self);
/* Set up the list of per-query execution_state records for a SQL function */
! static execution_state *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes = NULL;
! execution_state *preves = NULL;
execution_state *lasttages = NULL;
! ListCell *qtl_item;
! foreach(qtl_item, queryTree_list)
{
! Query *queryTree = (Query *) lfirst(qtl_item);
! Node *stmt;
! execution_state *newes;
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a SQL
function",
!
CreateCommandTag(stmt))));
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
! newes = (execution_state *) palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
! preves = newes;
}
/*
--- 123,200 ----
/* Set up the list of per-query execution_state records for a SQL function */
! static List *
init_execution_state(List *queryTree_list,
SQLFunctionCachePtr fcache,
bool lazyEvalOK)
{
! execution_state *firstes;
! execution_state *preves;
execution_state *lasttages = NULL;
! List *eslist;
! ListCell *lc1;
! ListCell *lc2;
! List *qtlist;
! Query *queryTree;
!
!
! eslist = NIL;
! foreach(lc1, queryTree_list)
{
! qtlist = (List *) lfirst(lc1);
! firstes = NULL;
! preves = NULL;
! foreach(lc2, qtlist)
! {
! Node *stmt;
! execution_state *newes;
! queryTree = (Query *) lfirst(lc2);
! Assert(IsA(queryTree, Query));
! if (queryTree->commandType == CMD_UTILITY)
! stmt = queryTree->utilityStmt;
! else
! stmt = (Node *) pg_plan_query(queryTree, 0,
NULL);
! /* Precheck all commands for validity in a function */
! if (IsA(stmt, TransactionStmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
SQL function",
!
CreateCommandTag(stmt))));
! if (fcache->readonly_func && !CommandIsReadOnly(stmt))
! ereport(ERROR,
!
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! /* translator: %s is a SQL statement name */
! errmsg("%s is not allowed in a
non-volatile function",
!
CreateCommandTag(stmt))));
!
! newes = (execution_state *)
palloc(sizeof(execution_state));
! if (preves)
! preves->next = newes;
! else
! firstes = newes;
! newes->next = NULL;
! newes->status = F_EXEC_START;
! newes->setsResult = false; /* might change
below */
! newes->lazyEval = false; /* might change below */
! newes->stmt = stmt;
! newes->qd = NULL;
! if (queryTree->canSetTag)
! lasttages = newes;
!
! preves = newes;
! }
!
! eslist = lappend(eslist, firstes);
}
/*
***************
*** 210,216 **** init_execution_state(List *queryTree_list,
}
}
! return firstes;
}
/* Initialize the SQLFunctionCache for a SQL function */
--- 227,233 ----
}
}
! return eslist;
}
/* Initialize the SQLFunctionCache for a SQL function */
***************
*** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
queryTree_list,
NULL,
&fcache->junkFilter);
--- 359,365 ----
*/
fcache->returnsTuple = check_sql_fn_retval(foid,
rettype,
!
llast(queryTree_list),
NULL,
&fcache->junkFilter);
***************
*** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
- Snapshot snapshot;
DestReceiver *dest;
Assert(es->qd == NULL);
! /*
! * In a read-only function, use the surrounding query's snapshot;
! * otherwise take a new snapshot for each query. The snapshot should
! * include a fresh command ID so that all work to date in this
transaction
! * is visible.
! */
! if (fcache->readonly_func)
! snapshot = GetActiveSnapshot();
! else
! {
! CommandCounterIncrement();
! snapshot = GetTransactionSnapshot();
! }
/*
* If this query produces the function result, send its output to the
--- 391,401 ----
static void
postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
{
DestReceiver *dest;
Assert(es->qd == NULL);
! Assert(ActiveSnapshotSet());
/*
* If this query produces the function result, send its output to the
***************
*** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
! snapshot,
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
snapshot,
dest,
fcache->paramLI);
--- 419,432 ----
if (IsA(es->stmt, PlannedStmt))
es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
fcache->src,
!
GetActiveSnapshot(),
!
InvalidSnapshot,
dest,
fcache->paramLI, 0);
else
es->qd = CreateUtilityQueryDesc(es->stmt,
fcache->src,
!
GetActiveSnapshot(),
dest,
fcache->paramLI);
***************
*** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 622,629 ----
execution_state *es;
TupleTableSlot *slot;
Datum result;
+ List *eslist;
+ ListCell *eslc;
/*
* Switch to context in which the fcache lives. This ensures that
***************
*** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS)
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! es = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (es && es->status == F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
--- 675,687 ----
init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
}
! eslist = fcache->func_state;
/*
* Convert params to appropriate format if starting a fresh execution.
(If
* continuing execution, we can re-use prior params.)
*/
! if (linitial(eslist) && ((execution_state *) linitial(eslist))->status
== F_EXEC_START)
postquel_sub_params(fcache, fcinfo);
/*
***************
*** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS)
/*
* Find first unfinished query in function.
*/
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
/*
* Execute each command in the function one after another until we
either
--- 694,711 ----
/*
* Find first unfinished query in function.
*/
!
! es = NULL; /* keep compiler quiet */
! foreach(eslc, eslist)
! {
! es = (execution_state *) lfirst(eslc);
!
! while (es && es->status == F_EXEC_DONE)
! es = es->next;
!
! if (es)
! break;
! }
/*
* Execute each command in the function one after another until we
either
***************
*** 699,706 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 716,746 ----
bool completed;
if (es->status == F_EXEC_START)
+ {
+ if (!fcache->readonly_func)
+ {
+ /*
+ * In a read-only function, use the surrounding
query's snapshot;
+ * otherwise take a new snapshot if we don't
have one yet. The
+ * snapshot should include a fresh command ID
so that all work to
+ * date in this transaction is visible.
+ */
+ if (!fcache->snapshot)
+ {
+ CommandCounterIncrement();
+ fcache->snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(fcache->snapshot);
+ }
+ else
+ PushUpdatedSnapshot(fcache->snapshot);
+ }
+
postquel_start(es, fcache);
+ if (!fcache->readonly_func)
+ PopActiveSnapshot();
+ }
+
completed = postquel_getnext(es, fcache);
/*
***************
*** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 766,790 ----
if (es->status != F_EXEC_DONE)
break;
es = es->next;
+
+ if (!es)
+ {
+ eslc = lnext(eslc);
+ if (!eslc)
+ break;
+
+ es = (execution_state *) lfirst(eslc);
+
+ /* make sure we take a new snapshot for this query list
*/
+ if (!fcache->readonly_func)
+ {
+ Assert(fcache->snapshot != InvalidSnapshot);
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+ }
+ else
+ Assert(fcache->snapshot == InvalidSnapshot);
+ }
}
/*
***************
*** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 853,863 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
else
{
***************
*** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 884,894 ----
PointerGetDatum(fcache));
fcache->shutdown_reg = false;
}
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
}
else
***************
*** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 919,929 ----
/* Clear the tuplestore, but keep it for next time */
tuplestore_clear(fcache->tstore);
+
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
}
/*
***************
*** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS)
*/
if (es == NULL)
{
! es = fcache->func_state;
! while (es)
{
! es->status = F_EXEC_START;
! es = es->next;
}
}
--- 932,945 ----
*/
if (es == NULL)
{
! foreach(eslc, fcache->func_state)
{
! es = (execution_state *) lfirst(eslc);
! while (es)
! {
! es->status = F_EXEC_START;
! es = es->next;
! }
}
}
***************
*** 913,931 **** sql_exec_error_callback(void *arg)
{
execution_state *es;
int query_num;
! es = fcache->func_state;
query_num = 1;
! while (es)
{
! if (es->qd)
{
! errcontext("SQL function \"%s\" statement %d",
! fcache->fname, query_num);
! break;
}
- es = es->next;
- query_num++;
}
if (es == NULL)
{
--- 990,1013 ----
{
execution_state *es;
int query_num;
+ ListCell *lc;
! es = NULL; /* keep compiler quiet */
query_num = 1;
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
! while (es)
{
! if (es->qd)
! {
! errcontext("SQL function \"%s\"
statement %d",
! fcache->fname,
query_num);
! break;
! }
! es = es->next;
! query_num++;
}
}
if (es == NULL)
{
***************
*** 956,973 **** static void
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es = fcache->func_state;
! while (es != NULL)
{
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
}
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
--- 1038,1066 ----
ShutdownSQLFunction(Datum arg)
{
SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
! execution_state *es;
! ListCell *lc;
! foreach(lc, fcache->func_state)
{
! es = (execution_state *) lfirst(lc);
!
! while (es)
! {
! /* Shut down anything still running */
! if (es->status == F_EXEC_RUN)
! postquel_end(es);
! /* Reset states to START in case we're called again */
! es->status = F_EXEC_START;
! es = es->next;
! }
}
+ /* Unregister snapshot if we have one */
+ if (fcache->snapshot != InvalidSnapshot)
+ UnregisterSnapshot(fcache->snapshot);
+ fcache->snapshot = InvalidSnapshot;
+
/* Release tuplestore if we have one */
if (fcache->tstore)
tuplestore_end(fcache->tstore);
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 1769,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1769,1775 ----
SPITupleTable *my_tuptable = NULL;
int res = 0;
bool have_active_snap = ActiveSnapshotSet();
+ bool registered_snap = false;
ErrorContextCallback spierrcontext;
CachedPlan *cplan = NULL;
ListCell *lc1;
***************
*** 1872,1879 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
}
else
{
!
PushActiveSnapshot(GetTransactionSnapshot());
pushed_active_snap = true;
}
}
else
--- 1873,1882 ----
}
else
{
! snapshot =
RegisterSnapshot(GetTransactionSnapshot());
! PushActiveSnapshot(snapshot);
pushed_active_snap = true;
+ registered_snap = true;
}
}
else
***************
*** 1966,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1969,1991 ----
*/
if (!read_only)
CommandCounterIncrement();
+
+ /*
+ * If we took a new snapshot for this query list, unregister it
and
+ * make sure we take a new one for the next list.
+ */
+ if (registered_snap)
+ {
+ UnregisterSnapshot(snapshot);
+ snapshot = InvalidSnapshot;
+ }
}
fail:
+ if (registered_snap)
+ UnregisterSnapshot(snapshot);
+
/* We no longer need the cached plan refcount, if any */
if (cplan)
ReleaseCachedPlan(cplan, true);
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 537,547 **** pg_parse_and_rewrite(const char *query_string, /* string to
execute */
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = list_concat(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
--- 537,547 ----
{
Node *parsetree = (Node *) lfirst(list_item);
! querytree_list = lappend(querytree_list,
!
pg_analyze_and_rewrite(parsetree,
!
query_string,
!
paramTypes,
!
numParams));
}
return querytree_list;
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 170,180 **** ProcessQuery(PlannedStmt *plan,
elog(DEBUG3, "ProcessQuery");
/*
- * Must always set a snapshot for plannable queries.
- */
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /*
* Create the QueryDesc object
*/
queryDesc = CreateQueryDesc(plan, sourceText,
--- 170,175 ----
***************
*** 234,241 **** ProcessQuery(PlannedStmt *plan,
/* Now take care of any queued AFTER triggers */
AfterTriggerEndQuery(queryDesc->estate);
- PopActiveSnapshot();
-
/*
* Now, we close down all the scans and free allocated resources.
*/
--- 229,234 ----
***************
*** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1213,1219 ----
char *completionTag)
{
ListCell *stmtlist_item;
+ Snapshot snapshot = InvalidSnapshot;
/*
* If the destination is DestRemoteExecute, change to DestNone. The
***************
*** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1256,1270 ----
if (log_executor_stats)
ResetUsage();
+ /* if no snapshot is set, grab a new one and register
it */
+ if (snapshot == InvalidSnapshot)
+ {
+ snapshot =
RegisterSnapshot(GetTransactionSnapshot());
+ PushActiveSnapshot(snapshot);
+ }
+ else
+ PushUpdatedSnapshot(snapshot);
+
if (pstmt->canSetTag)
{
/* statement can set tag string */
***************
*** 1279,1284 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1282,1289 ----
altdest, NULL);
}
+ PopActiveSnapshot();
+
if (log_executor_stats)
ShowUsage("EXECUTOR STATISTICS");
***************
*** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1296,1320 ----
*
* These are assumed canSetTag if they're the only stmt
in the
* portal.
+ *
+ * NotifyStmt is the only utility statement allowed in
a list of
+ * rewritten queries, and it doesn't need a snapshot so
we don't
+ * need to worry about it. However, if the list has
only one
+ * statement and it's a utility statement, we are not
allowed to
+ * take a snapshot. See the first comment in
PortalRunUtility().
*/
if (list_length(portal->stmts) == 1)
+ {
+ Assert(snapshot == InvalidSnapshot);
+
PortalRunUtility(portal, stmt, isTopLevel,
dest, completionTag);
+ }
else
+ {
+ Assert(IsA(stmt, NotifyStmt));
+
PortalRunUtility(portal, stmt, isTopLevel,
altdest, NULL);
+ }
}
/*
***************
*** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1332,1340 ----
MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
}
+ if (snapshot != InvalidSnapshot)
+ UnregisterSnapshot(snapshot);
+
/*
* If a command completion tag was supplied, use it. Otherwise use the
* portal's commandTag as the default completion tag.
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers