Hi,

Here's an updated patch.

I'm still not too fond of the logic in spi.c, but I can't see a better way to do this. If someone sees a better way, I'm not going to object.

I also made some changes to the SQL functions now that we have a different API. The previous code pushed and popped snapshots quite heavily.

I'd also like to see more regression tests for SQL functions, but I'm going to submit my suggestions as a separate patch.


Regards,
Marko Tiikkaja
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
***************
*** 755,761 **** fmgr_sql_validator(PG_FUNCTION_ARGS)
--- 755,763 ----
        Oid                     funcoid = PG_GETARG_OID(0);
        HeapTuple       tuple;
        Form_pg_proc proc;
+       List       *raw_parsetree_list;
        List       *querytree_list;
+       ListCell   *list_item;
        bool            isnull;
        Datum           tmp;
        char       *prosrc;
***************
*** 828,836 **** fmgr_sql_validator(PG_FUNCTION_ARGS)
                 */
                if (!haspolyarg)
                {
!                       querytree_list = pg_parse_and_rewrite(prosrc,
!                                                                               
                  proc->proargtypes.values,
!                                                                               
                  proc->pronargs);
                        (void) check_sql_fn_retval(funcoid, proc->prorettype,
                                                                           
querytree_list,
                                                                           
NULL, NULL);
--- 830,854 ----
                 */
                if (!haspolyarg)
                {
!                       /*
!                        * Parse and rewrite the queries in the function text.
!                        *
!                        * Even though check_sql_fn_retval is only interested 
in the last
!                        * query, we analyze all of them here to check for any 
errors.
!                        */
!                       raw_parsetree_list = pg_parse_query(prosrc);
!                       
!                       querytree_list = NIL;
!                       foreach(list_item, raw_parsetree_list)
!                       {
!                               Node *parsetree = (Node *) lfirst(list_item);
! 
!                               querytree_list = 
pg_analyze_and_rewrite(parsetree, prosrc,
!                                                                               
                                proc->proargtypes.values, proc->pronargs);
!                       }
! 
!                       Assert(querytree_list != NIL);
! 
                        (void) check_sql_fn_retval(funcoid, proc->prorettype,
                                                                           
querytree_list,
                                                                           
NULL, NULL);
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 84,89 **** typedef struct
--- 84,90 ----
        bool            returnsSet;             /* true if returning multiple 
rows */
        bool            returnsTuple;   /* true if returning whole tuple result 
*/
        bool            shutdown_reg;   /* true if registered shutdown callback 
*/
+       bool            snapshot;               /* true if pushed an active 
snapshot */
        bool            readonly_func;  /* true to run in "read only" mode */
        bool            lazyEval;               /* true if using lazyEval for 
result query */
  
***************
*** 93,107 **** typedef struct
  
        JunkFilter *junkFilter;         /* will be NULL if function returns 
VOID */
  
!       /* head of linked list of execution_state records */
!       execution_state *func_state;
  } SQLFunctionCache;
  
  typedef SQLFunctionCache *SQLFunctionCachePtr;
  
  
  /* non-export function prototypes */
! static execution_state *init_execution_state(List *queryTree_list,
                                         SQLFunctionCachePtr fcache,
                                         bool lazyEvalOK);
  static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
--- 94,107 ----
  
        JunkFilter *junkFilter;         /* will be NULL if function returns 
VOID */
  
!       List *func_state;
  } SQLFunctionCache;
  
  typedef SQLFunctionCache *SQLFunctionCachePtr;
  
  
  /* non-export function prototypes */
! static List *init_execution_state(List *queryTree_list,
                                         SQLFunctionCachePtr fcache,
                                         bool lazyEvalOK);
  static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK);
***************
*** 123,183 **** static void sqlfunction_destroy(DestReceiver *self);
  
  
  /* Set up the list of per-query execution_state records for a SQL function */
! static execution_state *
  init_execution_state(List *queryTree_list,
                                         SQLFunctionCachePtr fcache,
                                         bool lazyEvalOK)
  {
!       execution_state *firstes = NULL;
!       execution_state *preves = NULL;
        execution_state *lasttages = NULL;
!       ListCell   *qtl_item;
  
!       foreach(qtl_item, queryTree_list)
        {
!               Query      *queryTree = (Query *) lfirst(qtl_item);
!               Node       *stmt;
!               execution_state *newes;
  
!               Assert(IsA(queryTree, Query));
  
!               if (queryTree->commandType == CMD_UTILITY)
!                       stmt = queryTree->utilityStmt;
!               else
!                       stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
  
!               /* Precheck all commands for validity in a function */
!               if (IsA(stmt, TransactionStmt))
!                       ereport(ERROR,
!                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                       /* translator: %s is a SQL statement name */
!                                        errmsg("%s is not allowed in a SQL 
function",
!                                                       
CreateCommandTag(stmt))));
  
!               if (fcache->readonly_func && !CommandIsReadOnly(stmt))
!                       ereport(ERROR,
!                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                       /* translator: %s is a SQL statement name */
!                                        errmsg("%s is not allowed in a 
non-volatile function",
!                                                       
CreateCommandTag(stmt))));
  
!               newes = (execution_state *) palloc(sizeof(execution_state));
!               if (preves)
!                       preves->next = newes;
!               else
!                       firstes = newes;
  
!               newes->next = NULL;
!               newes->status = F_EXEC_START;
!               newes->setsResult = false;              /* might change below */
!               newes->lazyEval = false;        /* might change below */
!               newes->stmt = stmt;
!               newes->qd = NULL;
  
!               if (queryTree->canSetTag)
!                       lasttages = newes;
  
!               preves = newes;
        }
  
        /*
--- 123,200 ----
  
  
  /* Set up the list of per-query execution_state records for a SQL function */
! static List *
  init_execution_state(List *queryTree_list,
                                         SQLFunctionCachePtr fcache,
                                         bool lazyEvalOK)
  {
!       execution_state *firstes;
!       execution_state *preves;
        execution_state *lasttages = NULL;
!       List       *eslist;
!       ListCell   *lc1;
!       ListCell   *lc2;
!       List       *qtlist;
!       Query      *queryTree;
  
! 
!       eslist = NIL;
! 
!       foreach(lc1, queryTree_list)
        {
!               qtlist = (List *) lfirst(lc1);
!               firstes = NULL;
!               preves = NULL;
  
!               foreach(lc2, qtlist)
!               {
!                       Node       *stmt;
!                       execution_state *newes;
  
!                       queryTree = (Query *) lfirst(lc2);
  
!                       Assert(IsA(queryTree, Query));
  
!                       if (queryTree->commandType == CMD_UTILITY)
!                               stmt = queryTree->utilityStmt;
!                       else
!                               stmt = (Node *) pg_plan_query(queryTree, 0, 
NULL);
  
!                       /* Precheck all commands for validity in a function */
!                       if (IsA(stmt, TransactionStmt))
!                               ereport(ERROR,
!                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                               /* translator: %s is a SQL statement name */
!                                                errmsg("%s is not allowed in a 
SQL function",
!                                                               
CreateCommandTag(stmt))));
! 
!                       if (fcache->readonly_func && !CommandIsReadOnly(stmt))
!                               ereport(ERROR,
!                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
!                               /* translator: %s is a SQL statement name */
!                                                errmsg("%s is not allowed in a 
non-volatile function",
!                                                               
CreateCommandTag(stmt))));
! 
!                       newes = (execution_state *) 
palloc(sizeof(execution_state));
!                       if (preves)
!                               preves->next = newes;
!                       else
!                               firstes = newes;
! 
!                       newes->next = NULL;
!                       newes->status = F_EXEC_START;
!                       newes->setsResult = false;              /* might change 
below */
!                       newes->lazyEval = false;        /* might change below */
!                       newes->stmt = stmt;
!                       newes->qd = NULL;
  
!                       if (queryTree->canSetTag)
!                               lasttages = newes;
  
!                       preves = newes;
!               }
  
!               eslist = lappend(eslist, firstes);
        }
  
        /*
***************
*** 210,216 **** init_execution_state(List *queryTree_list,
                }
        }
  
!       return firstes;
  }
  
  /* Initialize the SQLFunctionCache for a SQL function */
--- 227,233 ----
                }
        }
  
!       return eslist;
  }
  
  /* Initialize the SQLFunctionCache for a SQL function */
***************
*** 224,230 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
--- 241,249 ----
        SQLFunctionCachePtr fcache;
        Oid                *argOidVect;
        int                     nargs;
+       List       *raw_parsetree_list;
        List       *queryTree_list;
+       ListCell   *list_item;
        Datum           tmp;
        bool            isNull;
  
***************
*** 319,325 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
        /*
         * Parse and rewrite the queries in the function text.
         */
!       queryTree_list = pg_parse_and_rewrite(fcache->src, argOidVect, nargs);
  
        /*
         * Check that the function returns the type it claims to.  Although in
--- 338,356 ----
        /*
         * Parse and rewrite the queries in the function text.
         */
!       raw_parsetree_list = pg_parse_query(fcache->src);
! 
!       queryTree_list = NIL;
!       foreach(list_item, raw_parsetree_list)
!       {
!               Node *parsetree = (Node *) lfirst(list_item);
! 
!               queryTree_list = lappend(queryTree_list,
!                                                                
pg_analyze_and_rewrite(parsetree,
!                                                                               
                                fcache->src,
!                                                                               
                                argOidVect,
!                                                                               
                                nargs));
!       }
  
        /*
         * Check that the function returns the type it claims to.  Although in
***************
*** 342,348 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
         */
        fcache->returnsTuple = check_sql_fn_retval(foid,
                                                                                
           rettype,
!                                                                               
           queryTree_list,
                                                                                
           NULL,
                                                                                
           &fcache->junkFilter);
  
--- 373,379 ----
         */
        fcache->returnsTuple = check_sql_fn_retval(foid,
                                                                                
           rettype,
!                                                                               
           llast(queryTree_list),
                                                                                
           NULL,
                                                                                
           &fcache->junkFilter);
  
***************
*** 374,397 **** init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK)
  static void
  postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
  {
-       Snapshot        snapshot;
        DestReceiver *dest;
  
        Assert(es->qd == NULL);
  
!       /*
!        * In a read-only function, use the surrounding query's snapshot;
!        * otherwise take a new snapshot for each query.  The snapshot should
!        * include a fresh command ID so that all work to date in this 
transaction
!        * is visible.
!        */
!       if (fcache->readonly_func)
!               snapshot = GetActiveSnapshot();
!       else
!       {
!               CommandCounterIncrement();
!               snapshot = GetTransactionSnapshot();
!       }
  
        /*
         * If this query produces the function result, send its output to the
--- 405,415 ----
  static void
  postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
  {
        DestReceiver *dest;
  
        Assert(es->qd == NULL);
  
!       Assert(ActiveSnapshotSet());
  
        /*
         * If this query produces the function result, send its output to the
***************
*** 415,427 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
        if (IsA(es->stmt, PlannedStmt))
                es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
                                                                 fcache->src,
!                                                                snapshot, 
InvalidSnapshot,
                                                                 dest,
                                                                 
fcache->paramLI, 0);
        else
                es->qd = CreateUtilityQueryDesc(es->stmt,
                                                                                
fcache->src,
!                                                                               
snapshot,
                                                                                
dest,
                                                                                
fcache->paramLI);
  
--- 433,446 ----
        if (IsA(es->stmt, PlannedStmt))
                es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
                                                                 fcache->src,
!                                                                
GetActiveSnapshot(),
!                                                                
InvalidSnapshot,
                                                                 dest,
                                                                 
fcache->paramLI, 0);
        else
                es->qd = CreateUtilityQueryDesc(es->stmt,
                                                                                
fcache->src,
!                                                                               
GetActiveSnapshot(),
                                                                                
dest,
                                                                                
fcache->paramLI);
  
***************
*** 451,459 **** postquel_getnext(execution_state *es, SQLFunctionCachePtr 
fcache)
  {
        bool            result;
  
-       /* Make our snapshot the active one for any called functions */
-       PushActiveSnapshot(es->qd->snapshot);
- 
        if (es->qd->utilitystmt)
        {
                /* ProcessUtility needs the PlannedStmt for DECLARE CURSOR */
--- 470,475 ----
***************
*** 481,488 **** postquel_getnext(execution_state *es, SQLFunctionCachePtr 
fcache)
                result = (count == 0L || es->qd->estate->es_processed == 0);
        }
  
-       PopActiveSnapshot();
- 
        return result;
  }
  
--- 497,502 ----
***************
*** 496,509 **** postquel_end(execution_state *es)
        /* Utility commands don't need Executor. */
        if (es->qd->utilitystmt == NULL)
        {
-               /* Make our snapshot the active one for any called functions */
-               PushActiveSnapshot(es->qd->snapshot);
- 
                if (es->qd->operation != CMD_SELECT)
                        AfterTriggerEndQuery(es->qd->estate);
                ExecutorEnd(es->qd);
- 
-               PopActiveSnapshot();
        }
  
        (*es->qd->dest->rDestroy) (es->qd->dest);
--- 510,518 ----
***************
*** 617,622 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 626,633 ----
        execution_state *es;
        TupleTableSlot *slot;
        Datum           result;
+       List            *eslist;
+       ListCell    *eslc;
  
        /*
         * Switch to context in which the fcache lives.  This ensures that
***************
*** 668,680 **** fmgr_sql(PG_FUNCTION_ARGS)
                init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
                fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
        }
!       es = fcache->func_state;
  
        /*
         * Convert params to appropriate format if starting a fresh execution. 
(If
         * continuing execution, we can re-use prior params.)
         */
!       if (es && es->status == F_EXEC_START)
                postquel_sub_params(fcache, fcinfo);
  
        /*
--- 679,691 ----
                init_sql_fcache(fcinfo->flinfo, lazyEvalOK);
                fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra;
        }
!       eslist = fcache->func_state;
  
        /*
         * Convert params to appropriate format if starting a fresh execution. 
(If
         * continuing execution, we can re-use prior params.)
         */
!       if (linitial(eslist) && ((execution_state *) linitial(eslist))->status 
== F_EXEC_START)
                postquel_sub_params(fcache, fcinfo);
  
        /*
***************
*** 687,694 **** fmgr_sql(PG_FUNCTION_ARGS)
        /*
         * Find first unfinished query in function.
         */
!       while (es && es->status == F_EXEC_DONE)
!               es = es->next;
  
        /*
         * Execute each command in the function one after another until we 
either
--- 698,715 ----
        /*
         * Find first unfinished query in function.
         */
! 
!       es = NULL; /* keep compiler quiet */
!       foreach(eslc, eslist)
!       {
!               es = (execution_state *) lfirst(eslc);
! 
!               while (es && es->status == F_EXEC_DONE)
!                       es = es->next;
! 
!               if (es)
!                       break;
!       }
  
        /*
         * Execute each command in the function one after another until we 
either
***************
*** 699,705 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 720,746 ----
                bool            completed;
  
                if (es->status == F_EXEC_START)
+               {
+                       if (!fcache->readonly_func)
+                       {
+                               /*
+                                * In a read-only function, use the surrounding 
query's snapshot;
+                                * otherwise take a new snapshot if we don't 
have one yet.  The
+                                * snapshot should include a fresh command ID 
so that all work to
+                                * date in this transaction is visible.
+                                */
+                               if (!fcache->snapshot)
+                               {
+                                       CommandCounterIncrement();
+                                       
PushActiveSnapshot(GetTransactionSnapshot());
+                                       fcache->snapshot = true;
+                               }
+                               else
+                                       UpdateActiveSnapshot();
+                       }
+                       
                        postquel_start(es, fcache);
+               }
  
                completed = postquel_getnext(es, fcache);
  
***************
*** 726,731 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 767,791 ----
                if (es->status != F_EXEC_DONE)
                        break;
                es = es->next;
+ 
+               if (!es)
+               {
+                       eslc = lnext(eslc);
+                       if (!eslc)
+                               break;
+ 
+                       es = (execution_state *) lfirst(eslc);
+ 
+                       /* make sure we take a new snapshot for this query list 
*/
+                       if (!fcache->readonly_func)
+                       {
+                               Assert(fcache->snapshot);
+                               PopActiveSnapshot();
+                               fcache->snapshot = false;
+                       }
+                       else
+                               Assert(!fcache->snapshot);
+               }
        }
  
        /*
***************
*** 794,799 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 854,864 ----
                                                                                
          PointerGetDatum(fcache));
                                fcache->shutdown_reg = false;
                        }
+ 
+                       /* Release snapshot if we have one */
+                       if (fcache->snapshot)
+                               PopActiveSnapshot();
+                       fcache->snapshot = false;
                }
                else
                {
***************
*** 820,825 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 885,895 ----
                                                                                
          PointerGetDatum(fcache));
                                fcache->shutdown_reg = false;
                        }
+ 
+                       /* Release snapshot if we have one */
+                       if (fcache->snapshot)
+                               PopActiveSnapshot();
+                       fcache->snapshot = false;
                }
        }
        else
***************
*** 850,855 **** fmgr_sql(PG_FUNCTION_ARGS)
--- 920,930 ----
  
                /* Clear the tuplestore, but keep it for next time */
                tuplestore_clear(fcache->tstore);
+ 
+               /* Release snapshot if we have one */
+               if (fcache->snapshot)
+                       PopActiveSnapshot();
+               fcache->snapshot = false;
        }
  
        /*
***************
*** 858,868 **** fmgr_sql(PG_FUNCTION_ARGS)
         */
        if (es == NULL)
        {
!               es = fcache->func_state;
!               while (es)
                {
!                       es->status = F_EXEC_START;
!                       es = es->next;
                }
        }
  
--- 933,946 ----
         */
        if (es == NULL)
        {
!               foreach(eslc, fcache->func_state)
                {
!                       es = (execution_state *) lfirst(eslc);
!                       while (es)
!                       {
!                               es->status = F_EXEC_START;
!                               es = es->next;
!                       }
                }
        }
  
***************
*** 913,931 **** sql_exec_error_callback(void *arg)
        {
                execution_state *es;
                int                     query_num;
  
!               es = fcache->func_state;
                query_num = 1;
!               while (es)
                {
!                       if (es->qd)
                        {
!                               errcontext("SQL function \"%s\" statement %d",
!                                                  fcache->fname, query_num);
!                               break;
                        }
-                       es = es->next;
-                       query_num++;
                }
                if (es == NULL)
                {
--- 991,1014 ----
        {
                execution_state *es;
                int                     query_num;
+               ListCell   *lc;
  
!               es = NULL; /* keep compiler quiet */
                query_num = 1;
!               foreach(lc, fcache->func_state)
                {
!                       es = (execution_state *) lfirst(lc);
!                       while (es)
                        {
!                               if (es->qd)
!                               {
!                                       errcontext("SQL function \"%s\" 
statement %d",
!                                                          fcache->fname, 
query_num);
!                                       break;
!                               }
!                               es = es->next;
!                               query_num++;
                        }
                }
                if (es == NULL)
                {
***************
*** 956,973 **** static void
  ShutdownSQLFunction(Datum arg)
  {
        SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
!       execution_state *es = fcache->func_state;
  
!       while (es != NULL)
        {
!               /* Shut down anything still running */
!               if (es->status == F_EXEC_RUN)
!                       postquel_end(es);
!               /* Reset states to START in case we're called again */
!               es->status = F_EXEC_START;
!               es = es->next;
        }
  
        /* Release tuplestore if we have one */
        if (fcache->tstore)
                tuplestore_end(fcache->tstore);
--- 1039,1067 ----
  ShutdownSQLFunction(Datum arg)
  {
        SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg);
!       execution_state *es;
!       ListCell                *lc;
  
!       foreach(lc, fcache->func_state)
        {
!               es = (execution_state *) lfirst(lc);
! 
!               while (es)
!               {
!                       /* Shut down anything still running */
!                       if (es->status == F_EXEC_RUN)
!                               postquel_end(es);
!                       /* Reset states to START in case we're called again */
!                       es->status = F_EXEC_START;
!                       es = es->next;
!               }
        }
  
+       /* Release snapshot if we have one */
+       if (fcache->snapshot)
+               PopActiveSnapshot();
+       fcache->snapshot = false;
+ 
        /* Release tuplestore if we have one */
        if (fcache->tstore)
                tuplestore_end(fcache->tstore);
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 1768,1774 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        Oid                     my_lastoid = InvalidOid;
        SPITupleTable *my_tuptable = NULL;
        int                     res = 0;
!       bool            have_active_snap = ActiveSnapshotSet();
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
--- 1768,1774 ----
        Oid                     my_lastoid = InvalidOid;
        SPITupleTable *my_tuptable = NULL;
        int                     res = 0;
!       bool            use_single_snap;
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
***************
*** 1781,1786 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1781,1798 ----
        spierrcontext.previous = error_context_stack;
        error_context_stack = &spierrcontext;
  
+       if (read_only || snapshot != InvalidSnapshot)
+       {
+               if (snapshot != InvalidSnapshot)
+                       PushActiveSnapshot(snapshot);
+               else if (read_only && ActiveSnapshotSet())
+                       PushActiveSnapshot(GetActiveSnapshot());
+ 
+               use_single_snap = true;
+       }
+       else
+               use_single_snap = false;
+ 
        foreach(lc1, plan->plancache_list)
        {
                CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
***************
*** 1802,1813 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        stmt_list = plansource->plan->stmt_list;
                }
  
                foreach(lc2, stmt_list)
                {
                        Node       *stmt = (Node *) lfirst(lc2);
                        bool            canSetTag;
                        DestReceiver *dest;
-                       bool            pushed_active_snap = false;
  
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
--- 1814,1827 ----
                        stmt_list = plansource->plan->stmt_list;
                }
  
+               if (!use_single_snap)
+                       PushActiveSnapshot(GetTransactionSnapshot());
+ 
                foreach(lc2, stmt_list)
                {
                        Node       *stmt = (Node *) lfirst(lc2);
                        bool            canSetTag;
                        DestReceiver *dest;
  
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
***************
*** 1848,1895 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  
                        /*
                         * If not read-only mode, advance the command counter 
before each
!                        * command.
                         */
                        if (!read_only)
                                CommandCounterIncrement();
  
                        dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
  
-                       if (snapshot == InvalidSnapshot)
-                       {
-                               /*
-                                * Default read_only behavior is to use the 
entry-time
-                                * ActiveSnapshot, if any; if read-write, grab 
a full new
-                                * snap.
-                                */
-                               if (read_only)
-                               {
-                                       if (have_active_snap)
-                                       {
-                                               
PushActiveSnapshot(GetActiveSnapshot());
-                                               pushed_active_snap = true;
-                                       }
-                               }
-                               else
-                               {
-                                       
PushActiveSnapshot(GetTransactionSnapshot());
-                                       pushed_active_snap = true;
-                               }
-                       }
-                       else
-                       {
-                               /*
-                                * We interpret read_only with a specified 
snapshot to be
-                                * exactly that snapshot, but read-write means 
use the snap
-                                * with advancing of command ID.
-                                */
-                               if (read_only)
-                                       PushActiveSnapshot(snapshot);
-                               else
-                                       PushUpdatedSnapshot(snapshot);
-                               pushed_active_snap = true;
-                       }
- 
                        if (IsA(stmt, PlannedStmt) &&
                                ((PlannedStmt *) stmt)->utilityStmt == NULL)
                        {
--- 1862,1877 ----
  
                        /*
                         * If not read-only mode, advance the command counter 
before each
!                        * command and update the snapshot.
                         */
                        if (!read_only)
+                       {
                                CommandCounterIncrement();
+                               UpdateActiveSnapshot();
+                       }
  
                        dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
  
                        if (IsA(stmt, PlannedStmt) &&
                                ((PlannedStmt *) stmt)->utilityStmt == NULL)
                        {
***************
*** 1925,1933 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                res = SPI_OK_UTILITY;
                        }
  
-                       if (pushed_active_snap)
-                               PopActiveSnapshot();
- 
                        /*
                         * The last canSetTag query sets the status values 
returned to the
                         * caller.      Be careful to free any tuptables not 
returned, to
--- 1907,1912 ----
***************
*** 1970,1975 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
--- 1949,1958 ----
  
  fail:
  
+       /* Pop the last snapshot off the stack if we pushed one */
+       if (!read_only || ActiveSnapshotSet())
+               PopActiveSnapshot();
+ 
        /* We no longer need the cached plan refcount, if any */
        if (cplan)
                ReleaseCachedPlan(cplan, true);
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 505,552 **** client_read_ended(void)
        }
  }
  
- 
- /*
-  * Parse a query string and pass it through the rewriter.
-  *
-  * A list of Query nodes is returned, since the string might contain
-  * multiple queries and/or the rewriter might expand one query to several.
-  *
-  * NOTE: this routine is no longer used for processing interactive queries,
-  * but it is still needed for parsing of SQL function bodies.
-  */
- List *
- pg_parse_and_rewrite(const char *query_string,        /* string to execute */
-                                        Oid *paramTypes,       /* parameter 
types */
-                                        int numParams)         /* number of 
parameters */
- {
-       List       *raw_parsetree_list;
-       List       *querytree_list;
-       ListCell   *list_item;
- 
-       /*
-        * (1) parse the request string into a list of raw parse trees.
-        */
-       raw_parsetree_list = pg_parse_query(query_string);
- 
-       /*
-        * (2) Do parse analysis and rule rewrite.
-        */
-       querytree_list = NIL;
-       foreach(list_item, raw_parsetree_list)
-       {
-               Node       *parsetree = (Node *) lfirst(list_item);
- 
-               querytree_list = list_concat(querytree_list,
-                                                                        
pg_analyze_and_rewrite(parsetree,
-                                                                               
                                        query_string,
-                                                                               
                                        paramTypes,
-                                                                               
                                        numParams));
-       }
- 
-       return querytree_list;
- }
- 
  /*
   * Do raw parsing (only).
   *
--- 505,510 ----
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 170,180 **** ProcessQuery(PlannedStmt *plan,
        elog(DEBUG3, "ProcessQuery");
  
        /*
-        * Must always set a snapshot for plannable queries.
-        */
-       PushActiveSnapshot(GetTransactionSnapshot());
- 
-       /*
         * Create the QueryDesc object
         */
        queryDesc = CreateQueryDesc(plan, sourceText,
--- 170,175 ----
***************
*** 234,241 **** ProcessQuery(PlannedStmt *plan,
        /* Now take care of any queued AFTER triggers */
        AfterTriggerEndQuery(queryDesc->estate);
  
-       PopActiveSnapshot();
- 
        /*
         * Now, we close down all the scans and free allocated resources.
         */
--- 229,234 ----
***************
*** 1220,1225 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1213,1219 ----
                           char *completionTag)
  {
        ListCell   *stmtlist_item;
+       bool            pushed_snap = false;
  
        /*
         * If the destination is DestRemoteExecute, change to DestNone.  The
***************
*** 1262,1267 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1256,1270 ----
                        if (log_executor_stats)
                                ResetUsage();
  
+                       /* if no snapshot is set, grab a new one */
+                       if (!pushed_snap)
+                       {
+                               PushActiveSnapshot(GetTransactionSnapshot());
+                               pushed_snap = true;
+                       }
+                       else
+                               UpdateActiveSnapshot();
+ 
                        if (pstmt->canSetTag)
                        {
                                /* statement can set tag string */
***************
*** 1291,1301 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1294,1318 ----
                         *
                         * These are assumed canSetTag if they're the only stmt 
in the
                         * portal.
+                        *
+                        * NotifyStmt is the only utility statement allowed in 
a list of
+                        * rewritten queries, and it doesn't need a snapshot so 
we don't
+                        * need to worry about it.  However, if the list has 
only one
+                        * statement and it's a utility statement, we are not 
allowed to
+                        * take a snapshot.  See the first comment in 
PortalRunUtility().
                         */
                        if (list_length(portal->stmts) == 1)
+                       {
+                               Assert(!pushed_snap);
+ 
                                PortalRunUtility(portal, stmt, isTopLevel, 
dest, completionTag);
+                       }
                        else
+                       {
+                               Assert(IsA(stmt, NotifyStmt));
+ 
                                PortalRunUtility(portal, stmt, isTopLevel, 
altdest, NULL);
+                       }
                }
  
                /*
***************
*** 1313,1318 **** PortalRunMulti(Portal portal, bool isTopLevel,
--- 1330,1338 ----
                MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
        }
  
+       if (pushed_snap)
+               PopActiveSnapshot();
+ 
        /*
         * If a command completion tag was supplied, use it.  Otherwise use the
         * portal's commandTag as the default completion tag.
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 305,313 **** PushUpdatedSnapshot(Snapshot snapshot)
         * elsewhere, so make a new copy to scribble on.
         */
        newsnap = CopySnapshot(snapshot);
-       newsnap->curcid = GetCurrentCommandId(false);
  
        PushActiveSnapshot(newsnap);
  }
  
  /*
--- 305,326 ----
         * elsewhere, so make a new copy to scribble on.
         */
        newsnap = CopySnapshot(snapshot);
  
        PushActiveSnapshot(newsnap);
+       UpdateActiveSnapshot();
+ }
+ 
+ /*
+  * UpdateActiveSnapshot
+  *
+  * Update the current CID of the active snapshot.
+  */
+ void
+ UpdateActiveSnapshot(void)
+ {
+       Assert(ActiveSnapshotSet());
+ 
+       ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
  }
  
  /*
*** a/src/include/tcop/tcopprot.h
--- b/src/include/tcop/tcopprot.h
***************
*** 45,52 **** typedef enum
  
  extern int    log_statement;
  
- extern List *pg_parse_and_rewrite(const char *query_string,
-                                        Oid *paramTypes, int numParams);
  extern List *pg_parse_query(const char *query_string);
  extern List *pg_analyze_and_rewrite(Node *parsetree, const char *query_string,
                                           Oid *paramTypes, int numParams);
--- 45,50 ----
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
***************
*** 29,34 **** extern void SnapshotSetCommandId(CommandId curcid);
--- 29,35 ----
  
  extern void PushActiveSnapshot(Snapshot snapshot);
  extern void PushUpdatedSnapshot(Snapshot snapshot);
+ extern void UpdateActiveSnapshot(void);
  extern void PopActiveSnapshot(void);
  extern Snapshot GetActiveSnapshot(void);
  extern bool ActiveSnapshotSet(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to