This is the patch I am currently playing with, to keep track of the
live snapshots within a transaction.

There are some obvious bugs left, and it open certain questions.  The
most obvious bug is that it is not tracking SerializableSnapshot in a
serializable transaction.

The main question I currently have is how to deal with ActiveSnapshot.
Currently, on most places it is just being registered just like any
snapshot, and then unregistered.  For the most part this works fine, but
I wonder if it's not just a bug waiting to happen -- since it is a
global variable and other places in the code feel free to assign to it,
it is possible that some code sets it, then calls some other code that
sets it to a different snapshot and then return -- on exit, the first
setter would try to unregister a snapshot that it didn't register.

So the question is, should we deal with ActiveSnapshot in some different
way?  For example snapmgr could know explicitely about it; keeping a
stack perhaps, and have callers call a routine in snapmgr "I need this
snap to be ActiveSnapshot" (push), and later "I no longer need
ActiveSnapshot" (pop).  On error, we can just forget about the part of
the stack that corresponds to the current subxact.  This would allow us
to remove some PG_TRY blocks.

The other question is about CommitTransactionCommand.  Currently my
EOXact routine barfs for every snapshot not unregistered on main
transaction commit -- a leak.  I see this as a good thing, however it
forced me to be more meticulous about not having ActiveSnapshot be set
in commands that have multiple transactions like VACUUM, multitable
CLUSTER and CREATE INDEX CONCURRENTLY.

(Hmm, maybe this last point is also saying that we should be dealing
with ActiveSnapshot explicitely on snapmgr.)


And finally, the last question is about some callers like COPY or
EXPLAIN that get a snapshot (ActiveSnapshot or a different one), then
modify Snapshot->curcid.  I'm not sure this is a problem, really, but
just to be careful I made them create a copy and register the copy.
The problem I have with it is that the snapshots are allocated only once
when they are registered -- if somebody registers the same snapshot in
the future, we will only increment the reference count and return the
pointer to the previously stored snapshot.  So if COPY reuses a snapshot
that has been previously stored, it would mangle the copy and the other
user could get an invalid view of the database.  I'm not sure this is
really a problem, because how could COPY get the same snapshot as
anybody else?  Still, it gives me a bad feeling.


(Hmm ... it just occured to me that perhaps it would be better to
improve the interaction between GetSnapshotData and RegisterSnapshot, to
avoid some palloc traffic.)

-- 
Alvaro Herrera                                http://www.CommandPrompt.com/
PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Index: src/backend/access/transam/xact.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/xact.c,v
retrieving revision 1.262
diff -c -p -r1.262 xact.c
*** src/backend/access/transam/xact.c	26 Mar 2008 18:48:59 -0000	1.262
--- src/backend/access/transam/xact.c	27 Mar 2008 13:16:15 -0000
*************** CommitTransaction(void)
*** 1752,1757 ****
--- 1752,1758 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(true);
  	AtEOXact_PgStat(true);
+ 	AtEOXact_Snapshot(true);
  	pgstat_report_xact_timestamp(0);
  
  	CurrentResourceOwner = NULL;
*************** PrepareTransaction(void)
*** 1984,1989 ****
--- 1985,1991 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(true);
  	/* don't call AtEOXact_PgStat here */
+ 	AtEOXact_Snapshot(true);
  
  	CurrentResourceOwner = NULL;
  	ResourceOwnerDelete(TopTransactionResourceOwner);
*************** AbortTransaction(void)
*** 2128,2133 ****
--- 2130,2136 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(false);
  	AtEOXact_PgStat(false);
+ 	AtEOXact_Snapshot(false);
  	pgstat_report_xact_timestamp(0);
  
  	/*
*************** CommitSubTransaction(void)
*** 3806,3811 ****
--- 3809,3815 ----
  	AtSubCommit_Notify();
  	AtEOSubXact_UpdateFlatFiles(true, s->subTransactionId,
  								s->parent->subTransactionId);
+ 	AtSubCommit_Snapshot();
  
  	CallSubXactCallbacks(SUBXACT_EVENT_COMMIT_SUB, s->subTransactionId,
  						 s->parent->subTransactionId);
*************** AbortSubTransaction(void)
*** 3926,3931 ****
--- 3930,3936 ----
  		AtSubAbort_Notify();
  		AtEOSubXact_UpdateFlatFiles(false, s->subTransactionId,
  									s->parent->subTransactionId);
+ 		AtSubAbort_Snapshot();
  
  		/* Advertise the fact that we aborted in pg_clog. */
  		(void) RecordTransactionAbort(true);
Index: src/backend/catalog/index.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/catalog/index.c,v
retrieving revision 1.296
diff -c -p -r1.296 index.c
*** src/backend/catalog/index.c	26 Mar 2008 21:10:37 -0000	1.296
--- src/backend/catalog/index.c	27 Mar 2008 13:16:15 -0000
*************** IndexBuildHeapScan(Relation heapRelation
*** 1466,1471 ****
--- 1466,1472 ----
  	TransactionId OldestXmin;
  	BlockNumber root_blkno = InvalidBlockNumber;
  	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
+ 	bool		unreg_snapshot = false;
  
  	/*
  	 * sanity checks
*************** IndexBuildHeapScan(Relation heapRelation
*** 1502,1508 ****
  	}
  	else if (indexInfo->ii_Concurrent)
  	{
! 		snapshot = CopySnapshot(GetTransactionSnapshot());
  		OldestXmin = InvalidTransactionId;		/* not used */
  	}
  	else
--- 1503,1510 ----
  	}
  	else if (indexInfo->ii_Concurrent)
  	{
! 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
! 		unreg_snapshot = true;
  		OldestXmin = InvalidTransactionId;		/* not used */
  	}
  	else
*************** IndexBuildHeapScan(Relation heapRelation
*** 1787,1792 ****
--- 1789,1796 ----
  	}
  
  	heap_endscan(scan);
+ 	if (unreg_snapshot)
+ 		UnregisterSnapshot(snapshot);
  
  	ExecDropSingleTupleTableSlot(slot);
  
Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.172
diff -c -p -r1.172 cluster.c
*** src/backend/commands/cluster.c	26 Mar 2008 21:10:37 -0000	1.172
--- src/backend/commands/cluster.c	27 Mar 2008 13:16:15 -0000
*************** cluster(ClusterStmt *stmt, bool isTopLev
*** 211,216 ****
--- 211,217 ----
  		rvs = get_tables_to_cluster(cluster_context);
  
  		/* Commit to get out of starting transaction */
+ 		UnregisterSnapshot(ActiveSnapshot);
  		CommitTransactionCommand();
  
  		/* Ok, now that we've got them all, cluster them one by one */
*************** cluster(ClusterStmt *stmt, bool isTopLev
*** 221,228 ****
  			/* Start a new transaction for each relation. */
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
! 			ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  			cluster_rel(rvtc, true);
  			CommitTransactionCommand();
  		}
  
--- 222,230 ----
  			/* Start a new transaction for each relation. */
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
! 			ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  			cluster_rel(rvtc, true);
+ 			UnregisterSnapshot(ActiveSnapshot);
  			CommitTransactionCommand();
  		}
  
Index: src/backend/commands/copy.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.298
diff -c -p -r1.298 copy.c
*** src/backend/commands/copy.c	26 Mar 2008 18:48:59 -0000	1.298
--- src/backend/commands/copy.c	27 Mar 2008 13:16:15 -0000
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1003,1008 ****
--- 1003,1009 ----
  		Query	   *query;
  		PlannedStmt *plan;
  		DestReceiver *dest;
+ 		Snapshot	snap;
  
  		Assert(!is_from);
  		cstate->rel = NULL;
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1044,1056 ****
  		plan = planner(query, 0, NULL);
  
  		/*
! 		 * Update snapshot command ID to ensure this query sees results of any
! 		 * previously executed queries.  (It's a bit cheesy to modify
! 		 * ActiveSnapshot without making a copy, but for the limited ways in
! 		 * which COPY can be invoked, I think it's OK, because the active
! 		 * snapshot shouldn't be shared with anything else anyway.)
  		 */
! 		ActiveSnapshot->curcid = GetCurrentCommandId(false);
  
  		/* Create dest receiver for COPY OUT */
  		dest = CreateDestReceiver(DestCopyOut, NULL);
--- 1045,1056 ----
  		plan = planner(query, 0, NULL);
  
  		/*
! 		 * Use a snapshot with updated command ID to ensure this query sees
! 		 * results of any previously executed queries.  We don't register
! 		 * the snapshot here -- CreateQueryDesc does it.
  		 */
! 		snap = CopySnapshot(ActiveSnapshot);
! 		snap->curcid = GetCurrentCommandId(false);
  
  		/* Create dest receiver for COPY OUT */
  		dest = CreateDestReceiver(DestCopyOut, NULL);
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1058,1065 ****
  
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan,
! 											ActiveSnapshot, InvalidSnapshot,
  											dest, NULL, false);
  
  		/*
  		 * Call ExecutorStart to prepare the plan for execution.
--- 1058,1066 ----
  
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan,
! 											snap, InvalidSnapshot,
  											dest, NULL, false);
+ 		FreeSnapshot(snap);
  
  		/*
  		 * Call ExecutorStart to prepare the plan for execution.
Index: src/backend/commands/explain.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/explain.c,v
retrieving revision 1.171
diff -c -p -r1.171 explain.c
*** src/backend/commands/explain.c	26 Mar 2008 18:48:59 -0000	1.171
--- src/backend/commands/explain.c	27 Mar 2008 13:17:43 -0000
*************** ExplainOnePlan(PlannedStmt *plannedstmt,
*** 226,246 ****
  	ExplainState *es;
  	StringInfoData buf;
  	int			eflags;
  
  	/*
! 	 * Update snapshot command ID to ensure this query sees results of any
! 	 * previously executed queries.  (It's a bit cheesy to modify
! 	 * ActiveSnapshot without making a copy, but for the limited ways in which
! 	 * EXPLAIN can be invoked, I think it's OK, because the active snapshot
! 	 * shouldn't be shared with anything else anyway.)
  	 */
! 	ActiveSnapshot->curcid = GetCurrentCommandId(false);
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt,
! 								ActiveSnapshot, InvalidSnapshot,
  								None_Receiver, params,
  								stmt->analyze);
  
  	INSTR_TIME_SET_CURRENT(starttime);
  
--- 226,246 ----
  	ExplainState *es;
  	StringInfoData buf;
  	int			eflags;
+ 	Snapshot	snap;
  
  	/*
! 	 * Update a snapshot with an updated command ID to ensure this query sees
! 	 * results of any previously executed queries.
  	 */
! 	snap = CopySnapshot(ActiveSnapshot);
! 	snap->curcid = GetCurrentCommandId(false);
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt,
! 								snap, InvalidSnapshot,
  								None_Receiver, params,
  								stmt->analyze);
+ 	FreeSnapshot(snap);
  
  	INSTR_TIME_SET_CURRENT(starttime);
  
Index: src/backend/commands/indexcmds.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/indexcmds.c,v
retrieving revision 1.174
diff -c -p -r1.174 indexcmds.c
*** src/backend/commands/indexcmds.c	26 Mar 2008 21:10:37 -0000	1.174
--- src/backend/commands/indexcmds.c	27 Mar 2008 13:16:15 -0000
*************** DefineIndex(RangeVar *heapRelation,
*** 483,488 ****
--- 483,491 ----
  	 */
  	LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
  
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	CommitTransactionCommand();
  	StartTransactionCommand();
  
*************** DefineIndex(RangeVar *heapRelation,
*** 541,547 ****
  	indexRelation = index_open(indexRelationId, RowExclusiveLock);
  
  	/* Set ActiveSnapshot since functions in the indexes may need it */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/* We have to re-build the IndexInfo struct, since it was lost in commit */
  	indexInfo = BuildIndexInfo(indexRelation);
--- 544,550 ----
  	indexRelation = index_open(indexRelationId, RowExclusiveLock);
  
  	/* Set ActiveSnapshot since functions in the indexes may need it */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/* We have to re-build the IndexInfo struct, since it was lost in commit */
  	indexInfo = BuildIndexInfo(indexRelation);
*************** DefineIndex(RangeVar *heapRelation,
*** 580,585 ****
--- 583,592 ----
  
  	heap_close(pg_index, RowExclusiveLock);
  
+ 	/* we can do away with our snapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Commit this transaction to make the indisready update visible.
  	 */
*************** DefineIndex(RangeVar *heapRelation,
*** 615,621 ****
  	 * We also set ActiveSnapshot to this snap, since functions in indexes may
  	 * need a snapshot.
  	 */
! 	snapshot = CopySnapshot(GetTransactionSnapshot());
  	ActiveSnapshot = snapshot;
  
  	/*
--- 622,628 ----
  	 * We also set ActiveSnapshot to this snap, since functions in indexes may
  	 * need a snapshot.
  	 */
! 	snapshot = RegisterSnapshot(GetTransactionSnapshot());
  	ActiveSnapshot = snapshot;
  
  	/*
*************** DefineIndex(RangeVar *heapRelation,
*** 624,629 ****
--- 631,642 ----
  	validate_index(relationId, indexRelationId, snapshot);
  
  	/*
+ 	 * Note: we no longer need the validating snapshot here anymore, but we
+ 	 * still need ActiveSnapshot, so we'll call UnregisterSnapshot once at the
+ 	 * end.
+ 	 */
+ 
+ 	/*
  	 * The index is now valid in the sense that it contains all currently
  	 * interesting tuples.	But since it might not contain tuples deleted just
  	 * before the reference snap was taken, we have to wait out any
*************** DefineIndex(RangeVar *heapRelation,
*** 685,690 ****
--- 698,707 ----
  	 */
  	CacheInvalidateRelcacheByRelid(heaprelid.relId);
  
+ 	/* we can now unregister ActiveSnapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Last thing to do is release the session-level lock on the parent table.
  	 */
*************** ReindexDatabase(const char *databaseName
*** 1452,1457 ****
--- 1469,1476 ----
  	heap_close(relationRelation, AccessShareLock);
  
  	/* Now reindex each rel in a separate transaction */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
  	CommitTransactionCommand();
  	foreach(l, relids)
  	{
*************** ReindexDatabase(const char *databaseName
*** 1459,1469 ****
  
  		StartTransactionCommand();
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  		if (reindex_relation(relid, true))
  			ereport(NOTICE,
  					(errmsg("table \"%s\" was reindexed",
  							get_rel_name(relid))));
  		CommitTransactionCommand();
  	}
  	StartTransactionCommand();
--- 1478,1490 ----
  
  		StartTransactionCommand();
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  		if (reindex_relation(relid, true))
  			ereport(NOTICE,
  					(errmsg("table \"%s\" was reindexed",
  							get_rel_name(relid))));
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
  		CommitTransactionCommand();
  	}
  	StartTransactionCommand();
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.230
diff -c -p -r1.230 trigger.c
*** src/backend/commands/trigger.c	26 Mar 2008 21:10:38 -0000	1.230
--- src/backend/commands/trigger.c	27 Mar 2008 13:16:15 -0000
*************** void
*** 2884,2889 ****
--- 2884,2890 ----
  AfterTriggerFireDeferred(void)
  {
  	AfterTriggerEventList *events;
+ 	bool		unreg_snapshot = false;
  
  	/* Must be inside a transaction */
  	Assert(afterTriggers != NULL);
*************** AfterTriggerFireDeferred(void)
*** 2898,2904 ****
  	 */
  	events = &afterTriggers->events;
  	if (events->head != NULL)
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Run all the remaining triggers.	Loop until they are all gone, in case
--- 2899,2908 ----
  	 */
  	events = &afterTriggers->events;
  	if (events->head != NULL)
! 	{
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
! 		unreg_snapshot = true;
! 	}
  
  	/*
  	 * Run all the remaining triggers.	Loop until they are all gone, in case
*************** AfterTriggerFireDeferred(void)
*** 2911,2916 ****
--- 2915,2926 ----
  		afterTriggerInvokeEvents(events, firing_id, NULL, true);
  	}
  
+ 	if (unreg_snapshot)
+ 	{
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
+ 	}
+ 
  	Assert(events->head == NULL);
  }
  
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.371
diff -c -p -r1.371 vacuum.c
*** src/backend/commands/vacuum.c	26 Mar 2008 21:10:38 -0000	1.371
--- src/backend/commands/vacuum.c	27 Mar 2008 13:16:15 -0000
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 407,412 ****
--- 407,418 ----
  	 */
  	if (use_own_xacts)
  	{
+ 		/* ActiveSnapshot is not set by autovacuum */
+ 		if (ActiveSnapshot)
+ 		{
+ 			UnregisterSnapshot(ActiveSnapshot);
+ 			ActiveSnapshot = NULL;
+ 		}
  		/* matches the StartTransaction in PostgresMain() */
  		CommitTransactionCommand();
  	}
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 444,450 ****
  				{
  					StartTransactionCommand();
  					/* functions in indexes may want a snapshot set */
! 					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  				}
  				else
  					old_context = MemoryContextSwitchTo(anl_context);
--- 450,456 ----
  				{
  					StartTransactionCommand();
  					/* functions in indexes may want a snapshot set */
! 					ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  				}
  				else
  					old_context = MemoryContextSwitchTo(anl_context);
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 452,458 ****
--- 458,468 ----
  				analyze_rel(relid, vacstmt, vac_strategy);
  
  				if (use_own_xacts)
+ 				{
+ 					UnregisterSnapshot(ActiveSnapshot);
+ 					ActiveSnapshot = NULL;
  					CommitTransactionCommand();
+ 				}
  				else
  				{
  					MemoryContextSwitchTo(old_context);
*************** vacuum_rel(Oid relid, VacuumStmt *vacstm
*** 979,985 ****
  	if (vacstmt->full)
  	{
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
--- 989,995 ----
  	if (vacstmt->full)
  	{
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
*************** vacuum_rel(Oid relid, VacuumStmt *vacstm
*** 1138,1143 ****
--- 1148,1160 ----
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
  
+ 	/* do away with our snapshot */
+ 	if (vacstmt->full)
+ 	{
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
+ 	}
+ 
  	/*
  	 * Complete the transaction and free all temporary memory used.
  	 */
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.304
diff -c -p -r1.304 execMain.c
*** src/backend/executor/execMain.c	26 Mar 2008 21:10:38 -0000	1.304
--- src/backend/executor/execMain.c	27 Mar 2008 14:19:05 -0000
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/acl.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
+ #include "utils/snapmgr.h"
  #include "utils/tqual.h"
  
  
*************** ExecutorStart(QueryDesc *queryDesc, int 
*** 188,195 ****
  	/*
  	 * Copy other important information into the EState
  	 */
! 	estate->es_snapshot = queryDesc->snapshot;
! 	estate->es_crosscheck_snapshot = queryDesc->crosscheck_snapshot;
  	estate->es_instrument = queryDesc->doInstrument;
  
  	/*
--- 189,196 ----
  	/*
  	 * Copy other important information into the EState
  	 */
! 	estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot);
! 	estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot);
  	estate->es_instrument = queryDesc->doInstrument;
  
  	/*
*************** ExecutorEnd(QueryDesc *queryDesc)
*** 316,321 ****
--- 317,326 ----
  	if (estate->es_select_into)
  		CloseIntoRel(queryDesc);
  
+ 	/* do away with our snapshots */
+ 	UnregisterSnapshot(estate->es_snapshot);
+ 	UnregisterSnapshot(estate->es_crosscheck_snapshot);
+ 
  	/*
  	 * Must switch out of context before destroying it
  	 */
Index: src/backend/executor/functions.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/functions.c,v
retrieving revision 1.124
diff -c -p -r1.124 functions.c
*** src/backend/executor/functions.c	26 Mar 2008 18:48:59 -0000	1.124
--- src/backend/executor/functions.c	27 Mar 2008 13:16:15 -0000
*************** postquel_start(execution_state *es, SQLF
*** 295,309 ****
  	 * In a read-only function, use the surrounding query's snapshot;
  	 * otherwise take a new snapshot for each query.  The snapshot should
  	 * include a fresh command ID so that all work to date in this transaction
! 	 * is visible.	We copy in both cases so that postquel_end can
! 	 * unconditionally do FreeSnapshot.
  	 */
  	if (fcache->readonly_func)
! 		snapshot = CopySnapshot(ActiveSnapshot);
  	else
  	{
  		CommandCounterIncrement();
! 		snapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  
  	if (IsA(es->stmt, PlannedStmt))
--- 295,309 ----
  	 * In a read-only function, use the surrounding query's snapshot;
  	 * otherwise take a new snapshot for each query.  The snapshot should
  	 * include a fresh command ID so that all work to date in this transaction
! 	 * is visible.	No need to create a separate copy; CreateQueryDesc will
! 	 * register it appropriately.
  	 */
  	if (fcache->readonly_func)
! 		snapshot = ActiveSnapshot;
  	else
  	{
  		CommandCounterIncrement();
! 		snapshot = GetTransactionSnapshot();
  	}
  
  	if (IsA(es->stmt, PlannedStmt))
*************** postquel_end(execution_state *es)
*** 425,431 ****
  		ActiveSnapshot = saveActiveSnapshot;
  	}
  
- 	FreeSnapshot(es->qd->snapshot);
  	FreeQueryDesc(es->qd);
  	es->qd = NULL;
  }
--- 425,430 ----
Index: src/backend/executor/spi.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/spi.c,v
retrieving revision 1.191
diff -c -p -r1.191 spi.c
*** src/backend/executor/spi.c	26 Mar 2008 18:48:59 -0000	1.191
--- src/backend/executor/spi.c	27 Mar 2008 13:16:15 -0000
*************** SPI_execp(SPIPlanPtr plan, Datum *Values
*** 365,373 ****
  
  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use.  Also, the caller
!  * may specify that AFTER triggers should be queued as part of the outer
!  * query rather than being fired immediately at the end of the command.
   *
   * This is currently not documented in spi.sgml because it is only intended
   * for use by RI triggers.
--- 365,374 ----
  
  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use, which will be
!  * registered here.  Also, the caller may specify that AFTER triggers should be
!  * queued as part of the outer query rather than being fired immediately at the
!  * end of the command.
   *
   * This is currently not documented in spi.sgml because it is only intended
   * for use by RI triggers.
*************** SPI_cursor_open(const char *name, SPIPla
*** 1028,1034 ****
  	}
  
  	/*
! 	 * Set up the snapshot to use.	(PortalStart will do CopySnapshot, so we
  	 * skip that here.)
  	 */
  	if (read_only)
--- 1029,1035 ----
  	}
  
  	/*
! 	 * Set up the snapshot to use.	(PortalStart will do RegisterSnapshot, so we
  	 * skip that here.)
  	 */
  	if (read_only)
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1598,1606 ****
  					 * ActiveSnapshot; if read-write, grab a full new snap.
  					 */
  					if (read_only)
! 						ActiveSnapshot = CopySnapshot(saveActiveSnapshot);
  					else
! 						ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  				}
  				else
  				{
--- 1599,1607 ----
  					 * ActiveSnapshot; if read-write, grab a full new snap.
  					 */
  					if (read_only)
! 						ActiveSnapshot = RegisterSnapshot(saveActiveSnapshot);
  					else
! 						ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  				}
  				else
  				{
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1609,1617 ****
  					 * exactly that snapshot, but read-write means use the
  					 * snap with advancing of command ID.
  					 */
! 					ActiveSnapshot = CopySnapshot(snapshot);
! 					if (!read_only)
! 						ActiveSnapshot->curcid = GetCurrentCommandId(false);
  				}
  
  				if (IsA(stmt, PlannedStmt) &&
--- 1610,1626 ----
  					 * exactly that snapshot, but read-write means use the
  					 * snap with advancing of command ID.
  					 */
! 					if (read_only)
! 						ActiveSnapshot = RegisterSnapshot(snapshot);
! 					else
! 					{
! 						Snapshot	snap;
! 
! 						snap = CopySnapshot(snapshot);
! 						snap->curcid = GetCurrentCommandId(false);
! 						ActiveSnapshot = RegisterSnapshot(snap);
! 						FreeSnapshot(snap);
! 					}
  				}
  
  				if (IsA(stmt, PlannedStmt) &&
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1641,1647 ****
  						_SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free;
  					res = SPI_OK_UTILITY;
  				}
! 				FreeSnapshot(ActiveSnapshot);
  				ActiveSnapshot = NULL;
  
  				/*
--- 1650,1656 ----
  						_SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free;
  					res = SPI_OK_UTILITY;
  				}
! 				UnregisterSnapshot(ActiveSnapshot);
  				ActiveSnapshot = NULL;
  
  				/*
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.43
diff -c -p -r1.43 procarray.c
*** src/backend/storage/ipc/procarray.c	26 Mar 2008 18:48:59 -0000	1.43
--- src/backend/storage/ipc/procarray.c	28 Mar 2008 13:42:55 -0000
*************** GetSnapshotData(Snapshot snapshot, bool 
*** 681,691 ****
  
  	Assert(snapshot != NULL);
  
- 	/* Serializable snapshot must be computed before any other... */
- 	Assert(serializable ?
- 		   !TransactionIdIsValid(MyProc->xmin) :
- 		   TransactionIdIsValid(MyProc->xmin));
- 
  	/*
  	 * Allocating space for maxProcs xids is usually overkill; numProcs would
  	 * be sufficient.  But it seems better to do the malloc while not holding
--- 681,686 ----
Index: src/backend/storage/large_object/inv_api.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/large_object/inv_api.c,v
retrieving revision 1.131
diff -c -p -r1.131 inv_api.c
*** src/backend/storage/large_object/inv_api.c	26 Mar 2008 21:10:38 -0000	1.131
--- src/backend/storage/large_object/inv_api.c	27 Mar 2008 13:16:16 -0000
*************** inv_open(Oid lobjId, int flags, MemoryCo
*** 245,256 ****
  	}
  	else if (flags & INV_READ)
  	{
! 		/* be sure to copy snap into mcxt */
! 		MemoryContext oldContext = MemoryContextSwitchTo(mcxt);
! 
! 		retval->snapshot = CopySnapshot(ActiveSnapshot);
  		retval->flags = IFS_RDLOCK;
- 		MemoryContextSwitchTo(oldContext);
  	}
  	else
  		elog(ERROR, "invalid flags: %d", flags);
--- 245,252 ----
  	}
  	else if (flags & INV_READ)
  	{
! 		retval->snapshot = RegisterSnapshot(ActiveSnapshot);
  		retval->flags = IFS_RDLOCK;
  	}
  	else
  		elog(ERROR, "invalid flags: %d", flags);
*************** inv_close(LargeObjectDesc *obj_desc)
*** 273,279 ****
  {
  	Assert(PointerIsValid(obj_desc));
  	if (obj_desc->snapshot != SnapshotNow)
! 		FreeSnapshot(obj_desc->snapshot);
  	pfree(obj_desc);
  }
  
--- 269,275 ----
  {
  	Assert(PointerIsValid(obj_desc));
  	if (obj_desc->snapshot != SnapshotNow)
! 		UnregisterSnapshot(obj_desc->snapshot);
  	pfree(obj_desc);
  }
  
Index: src/backend/tcop/fastpath.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/fastpath.c,v
retrieving revision 1.99
diff -c -p -r1.99 fastpath.c
*** src/backend/tcop/fastpath.c	26 Mar 2008 18:48:59 -0000	1.99
--- src/backend/tcop/fastpath.c	27 Mar 2008 13:16:16 -0000
*************** HandleFunctionRequest(StringInfo msgBuf)
*** 309,315 ****
  	 * Now that we know we are in a valid transaction, set snapshot in case
  	 * needed by function itself or one of the datatype I/O routines.
  	 */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Begin parsing the buffer contents.
--- 309,315 ----
  	 * Now that we know we are in a valid transaction, set snapshot in case
  	 * needed by function itself or one of the datatype I/O routines.
  	 */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Begin parsing the buffer contents.
*************** HandleFunctionRequest(StringInfo msgBuf)
*** 396,401 ****
--- 396,405 ----
  
  	SendFunctionResult(retval, fcinfo.isnull, fip->rettype, rformat);
  
+ 	/* We no longer need the snapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Emit duration logging if appropriate.
  	 */
Index: src/backend/tcop/postgres.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/postgres.c,v
retrieving revision 1.547
diff -c -p -r1.547 postgres.c
*** src/backend/tcop/postgres.c	26 Mar 2008 18:48:59 -0000	1.547
--- src/backend/tcop/postgres.c	27 Mar 2008 13:16:16 -0000
*************** pg_plan_queries(List *querytrees, int cu
*** 754,760 ****
  			{
  				if (needSnapshot && mySnapshot == NULL)
  				{
! 					mySnapshot = CopySnapshot(GetTransactionSnapshot());
  					ActiveSnapshot = mySnapshot;
  				}
  				stmt = (Node *) pg_plan_query(query, cursorOptions,
--- 754,760 ----
  			{
  				if (needSnapshot && mySnapshot == NULL)
  				{
! 					mySnapshot = RegisterSnapshot(GetTransactionSnapshot());
  					ActiveSnapshot = mySnapshot;
  				}
  				stmt = (Node *) pg_plan_query(query, cursorOptions,
*************** pg_plan_queries(List *querytrees, int cu
*** 765,771 ****
  		}
  
  		if (mySnapshot)
! 			FreeSnapshot(mySnapshot);
  	}
  	PG_CATCH();
  	{
--- 765,771 ----
  		}
  
  		if (mySnapshot)
! 			UnregisterSnapshot(mySnapshot);
  	}
  	PG_CATCH();
  	{
Index: src/backend/tcop/pquery.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/pquery.c,v
retrieving revision 1.122
diff -c -p -r1.122 pquery.c
*** src/backend/tcop/pquery.c	26 Mar 2008 18:48:59 -0000	1.122
--- src/backend/tcop/pquery.c	27 Mar 2008 13:16:16 -0000
*************** CreateQueryDesc(PlannedStmt *plannedstmt
*** 70,77 ****
  	qd->operation = plannedstmt->commandType;	/* operation */
  	qd->plannedstmt = plannedstmt;		/* plan */
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
! 	qd->snapshot = snapshot;	/* snapshot */
! 	qd->crosscheck_snapshot = crosscheck_snapshot;		/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->doInstrument = doInstrument;	/* instrumentation wanted? */
--- 70,78 ----
  	qd->operation = plannedstmt->commandType;	/* operation */
  	qd->plannedstmt = plannedstmt;		/* plan */
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
! 	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
! 	/* RI check snapshot */
! 	qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->doInstrument = doInstrument;	/* instrumentation wanted? */
*************** CreateUtilityQueryDesc(Node *utilitystmt
*** 98,104 ****
  	qd->operation = CMD_UTILITY;	/* operation */
  	qd->plannedstmt = NULL;
  	qd->utilitystmt = utilitystmt;		/* utility command */
! 	qd->snapshot = snapshot;	/* snapshot */
  	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
--- 99,105 ----
  	qd->operation = CMD_UTILITY;	/* operation */
  	qd->plannedstmt = NULL;
  	qd->utilitystmt = utilitystmt;		/* utility command */
! 	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
  	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
*************** CreateUtilityQueryDesc(Node *utilitystmt
*** 118,123 ****
--- 119,127 ----
  void
  FreeQueryDesc(QueryDesc *qdesc)
  {
+ 	UnregisterSnapshot(qdesc->snapshot);
+ 	UnregisterSnapshot(qdesc->crosscheck_snapshot);
+ 
  	/* Can't be a live query */
  	Assert(qdesc->estate == NULL);
  	/* Only the QueryDesc itself need be freed */
*************** ProcessQuery(PlannedStmt *plan,
*** 155,161 ****
  	 * Must always set snapshot for plannable queries.	Note we assume that
  	 * caller will take care of restoring ActiveSnapshot on exit/error.
  	 */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Create the QueryDesc object
--- 159,165 ----
  	 * Must always set snapshot for plannable queries.	Note we assume that
  	 * caller will take care of restoring ActiveSnapshot on exit/error.
  	 */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Create the QueryDesc object
*************** ProcessQuery(PlannedStmt *plan,
*** 220,230 ****
  	 * Now, we close down all the scans and free allocated resources.
  	 */
  	ExecutorEnd(queryDesc);
  
  	FreeQueryDesc(queryDesc);
- 
- 	FreeSnapshot(ActiveSnapshot);
- 	ActiveSnapshot = NULL;
  }
  
  /*
--- 224,232 ----
  	 * Now, we close down all the scans and free allocated resources.
  	 */
  	ExecutorEnd(queryDesc);
+ 	UnregisterSnapshot(ActiveSnapshot);
  
  	FreeQueryDesc(queryDesc);
  }
  
  /*
*************** PortalStart(Portal portal, ParamListInfo
*** 488,500 ****
  			case PORTAL_ONE_SELECT:
  
  				/*
! 				 * Must set snapshot before starting executor.	Be sure to
! 				 * copy it into the portal's context.
  				 */
  				if (snapshot)
! 					ActiveSnapshot = CopySnapshot(snapshot);
  				else
! 					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  				/*
  				 * Create QueryDesc in portal's context; for the moment, set
--- 490,502 ----
  			case PORTAL_ONE_SELECT:
  
  				/*
! 				 * Must set snapshot before starting executor.  Note: we don't
! 				 * register them here; they're registered in ExecutorStart.
  				 */
  				if (snapshot)
! 					ActiveSnapshot = snapshot;
  				else
! 					ActiveSnapshot = GetTransactionSnapshot();
  
  				/*
  				 * Create QueryDesc in portal's context; for the moment, set
*************** PortalRunUtility(Portal portal, Node *ut
*** 1167,1173 ****
  		  IsA(utilityStmt, NotifyStmt) ||
  		  IsA(utilityStmt, UnlistenStmt) ||
  		  IsA(utilityStmt, CheckPointStmt)))
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	else
  		ActiveSnapshot = NULL;
  
--- 1169,1175 ----
  		  IsA(utilityStmt, NotifyStmt) ||
  		  IsA(utilityStmt, UnlistenStmt) ||
  		  IsA(utilityStmt, CheckPointStmt)))
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  	else
  		ActiveSnapshot = NULL;
  
*************** PortalRunUtility(Portal portal, Node *ut
*** 1182,1188 ****
  	MemoryContextSwitchTo(PortalGetHeapMemory(portal));
  
  	if (ActiveSnapshot)
! 		FreeSnapshot(ActiveSnapshot);
  	ActiveSnapshot = NULL;
  }
  
--- 1184,1190 ----
  	MemoryContextSwitchTo(PortalGetHeapMemory(portal));
  
  	if (ActiveSnapshot)
! 		UnregisterSnapshot(ActiveSnapshot);
  	ActiveSnapshot = NULL;
  }
  
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.107
diff -c -p -r1.107 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c	26 Mar 2008 21:10:39 -0000	1.107
--- src/backend/utils/adt/ri_triggers.c	27 Mar 2008 13:16:16 -0000
*************** RI_Initial_Check(Trigger *trigger, Relat
*** 2756,2767 ****
  	/*
  	 * Run the plan.  For safety we force a current snapshot to be used. (In
  	 * serializable mode, this arguably violates serializability, but we
! 	 * really haven't got much choice.)  We need at most one tuple returned,
! 	 * so pass limit = 1.
  	 */
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
! 									  CopySnapshot(GetLatestSnapshot()),
  									  InvalidSnapshot,
  									  true, false, 1);
  
--- 2756,2768 ----
  	/*
  	 * Run the plan.  For safety we force a current snapshot to be used. (In
  	 * serializable mode, this arguably violates serializability, but we
! 	 * really haven't got much choice.)  We don't need to register the
! 	 * snapshot, because SPI_execute_snapshot will see to it.  We need at most
! 	 * one tuple returned, so pass limit = 1.
  	 */
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
! 									  GetLatestSnapshot(),
  									  InvalidSnapshot,
  									  true, false, 1);
  
*************** ri_PerformCheck(RI_QueryKey *qkey, SPIPl
*** 3311,3323 ****
  	 * caller passes detectNewRows == false then it's okay to do the query
  	 * with the transaction snapshot; otherwise we use a current snapshot, and
  	 * tell the executor to error out if it finds any rows under the current
! 	 * snapshot that wouldn't be visible per the transaction snapshot.
  	 */
  	if (IsXactIsoLevelSerializable && detectNewRows)
  	{
  		CommandCounterIncrement();		/* be sure all my own work is visible */
! 		test_snapshot = CopySnapshot(GetLatestSnapshot());
! 		crosscheck_snapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
--- 3312,3326 ----
  	 * caller passes detectNewRows == false then it's okay to do the query
  	 * with the transaction snapshot; otherwise we use a current snapshot, and
  	 * tell the executor to error out if it finds any rows under the current
! 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
! 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
! 	 * to bother here.
  	 */
  	if (IsXactIsoLevelSerializable && detectNewRows)
  	{
  		CommandCounterIncrement();		/* be sure all my own work is visible */
! 		test_snapshot = GetLatestSnapshot();
! 		crosscheck_snapshot = GetTransactionSnapshot();
  	}
  	else
  	{
Index: src/backend/utils/time/snapmgr.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/time/snapmgr.c,v
retrieving revision 1.1
diff -c -p -r1.1 snapmgr.c
*** src/backend/utils/time/snapmgr.c	26 Mar 2008 18:48:59 -0000	1.1
--- src/backend/utils/time/snapmgr.c	28 Mar 2008 13:43:43 -0000
***************
*** 14,20 ****
--- 14,22 ----
  
  #include "access/xact.h"
  #include "access/transam.h"
+ #include "storage/proc.h"
  #include "storage/procarray.h"
+ #include "utils/memutils.h"
  #include "utils/snapmgr.h"
  #include "utils/tqual.h"
  
*************** FreeXactSnapshot(void)
*** 170,172 ****
--- 172,372 ----
  	LatestSnapshot = NULL;
  	ActiveSnapshot = NULL;		/* just for cleanliness */
  }
+ 
+ /*
+  * Struct to keep track of the registered snapshots
+  */
+ typedef struct SnapshotElt
+ {
+ 	Snapshot	s_snap;
+ 	uint32		s_refcount;
+ 	uint32		s_level;
+ 	struct SnapshotElt	*s_next;
+ } SnapshotElt;
+ 
+ /*
+  * Head of the list of registered snapshots, and memory context where it
+  * resides
+  */
+ static SnapshotElt	   *SnapshotHead = NULL;
+ static MemoryContext	SnapshotCxt = NULL;
+ 
+ /*
+  * RegisterSnapshot
+  * 		Register a snapshot as being in use
+  *
+  * This registers a snapshot on our list.  If the snapshot has been registered
+  * previously, increment the reference count.
+  *
+  * If InvalidSnapshot or a non-MVCC snapshot is passed, it is not registered.
+  * XXX -- should it be a hard error if a non-MVCC snapshot is passed?
+  */
+ Snapshot
+ RegisterSnapshot(Snapshot snapshot)
+ {
+ 	SnapshotElt	*elt;
+ 	SnapshotElt	*newhead;
+ 	MemoryContext	oldcxt;
+ 
+ 	if (snapshot == InvalidSnapshot)
+ 		return InvalidSnapshot;
+ 
+ 	if (SnapshotCxt == NULL)
+ 	{
+ 		SnapshotCxt = AllocSetContextCreate(TopTransactionContext,
+ 											"Snapshot Context",
+ 											ALLOCSET_DEFAULT_MINSIZE,
+ 											ALLOCSET_DEFAULT_INITSIZE,
+ 											ALLOCSET_DEFAULT_MAXSIZE);
+ 	}
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_snap == snapshot)
+ 		{
+ 			elt->s_refcount++;
+ 			return elt->s_snap;
+ 		}
+ 	}
+ 
+ 	oldcxt = MemoryContextSwitchTo(SnapshotCxt);
+ 	newhead = palloc(sizeof(SnapshotElt));
+ 	newhead->s_next = SnapshotHead;
+ 	newhead->s_snap = CopySnapshot(snapshot);
+ 	newhead->s_level = GetCurrentTransactionNestLevel();
+ 	newhead->s_refcount = 1;
+ 	MemoryContextSwitchTo(oldcxt);
+ 
+ 	SnapshotHead = newhead;
+ 
+ 	return SnapshotHead->s_snap;
+ }
+ 
+ /*
+  * UnregisterSnapshot
+  * 		Signals that a snapshot is no longer necessary
+  *
+  * If the reference count falls to zero, the memory is released.
+  */
+ void
+ UnregisterSnapshot(Snapshot snapshot)
+ {
+ 	SnapshotElt	*prev = NULL;
+ 	SnapshotElt	*elt;
+ 	bool		found = false;
+ 
+ 	if (snapshot == InvalidSnapshot)
+ 		return;
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_snap == snapshot)
+ 		{
+ 			elt->s_refcount--;
+ 			found = true;
+ 
+ 			if (elt->s_refcount == 0)
+ 			{
+ 				if (prev)
+ 					prev->s_next = elt->s_next;
+ 				else
+ 					SnapshotHead = elt->s_next;
+ 
+ 				elog(LOG, "found: refcount is now 0");
+ 
+ 				pfree(elt);
+ 			}
+ 			break;
+ 		}
+ 
+ 		prev = elt;
+ 	}
+ 
+ 	if (!found)
+ 		elog(WARNING, "unregistering failed for snapshot %p", snapshot);
+ }
+ 
+ /*
+  * At subtransaction abort, unregister all snapshots registered during this
+  * subtransaction.
+  * */
+ void
+ AtSubAbort_Snapshot(void)
+ {
+ 	SnapshotElt	*prev = NULL;
+ 	SnapshotElt	*elt;
+ 	int		level = GetCurrentTransactionNestLevel();
+ 
+ 	for (elt = SnapshotHead; elt != NULL;)
+ 	{
+ 		if (elt->s_level == level)
+ 		{
+ 			SnapshotElt	*tofree;
+ 
+ 			if (prev)
+ 				prev->s_next = elt->s_next;
+ 			else
+ 				SnapshotHead = elt->s_next;
+ 
+ 			tofree = elt;
+ 			elt = elt->s_next;
+ 			pfree(tofree);
+ 		}
+ 		else
+ 		{
+ 			prev = elt;
+ 			elt = elt->s_next;
+ 		}
+ 	}
+ }
+ 
+ /*
+  * At subtransaction commit, reassign all snapshots to the parent subxact.
+  */
+ void
+ AtSubCommit_Snapshot(void)
+ {
+ 	SnapshotElt	*elt;
+ 	int		level = GetCurrentTransactionNestLevel();
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_level == level)
+ 			elt->s_level--;
+ 	}
+ }
+ 
+ /*
+  * AtEOXact_Snapshot
+  *		Main transaction end handler for snapshots. 
+  *
+  * At xact commit, we want to complain about any leftover snapshots, as a
+  * measure against forgotten snapshot cleanup calls.  On xact abort, it is
+  * expected that some snapshots are left behind; just cleanup our module.
+  */
+ void
+ AtEOXact_Snapshot(bool is_commit)
+ {
+ 	if (SnapshotCxt == NULL)
+ 		return;
+ 
+ 	if (is_commit)
+ 	{
+ 		SnapshotElt	*elt;
+ 
+ 		/* complain about any unregistered snapshot */
+ 		for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 		{
+ 			ereport(WARNING,
+ 					(errmsg("snapshot %p not destroyed at commit (%d refs)",
+ 							elt->s_snap, elt->s_refcount)));
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * And forget about them.  We don't need to reset the context -- it'll
+ 	 * go away with TopTransactionContext.
+ 	 */
+ 	SnapshotCxt = NULL;
+ 	SnapshotHead = NULL;
+ }
Index: src/include/utils/snapmgr.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/utils/snapmgr.h,v
retrieving revision 1.1
diff -c -p -r1.1 snapmgr.h
*** src/include/utils/snapmgr.h	26 Mar 2008 18:48:59 -0000	1.1
--- src/include/utils/snapmgr.h	27 Mar 2008 13:46:58 -0000
*************** extern Snapshot CopySnapshot(Snapshot sn
*** 30,33 ****
--- 30,41 ----
  extern void FreeSnapshot(Snapshot snapshot);
  extern void FreeXactSnapshot(void);
  
+ extern Snapshot RegisterSnapshot(Snapshot snapshot);
+ extern void UnregisterSnapshot(Snapshot snapshot);
+ extern void AtSubAbort_Snapshot(void);
+ extern void AtSubCommit_Snapshot(void);
+ extern void AtEOXact_Snapshot(bool is_commit);
+ extern void AtPrepare_Snapshot(void);
+ extern void PostPrepare_Snapshot(void);
+ 
  #endif /* SNAPMGR_H */
Index: src/pl/plpgsql/src/pl_exec.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/pl/plpgsql/src/pl_exec.c,v
retrieving revision 1.206
diff -c -p -r1.206 pl_exec.c
*** src/pl/plpgsql/src/pl_exec.c	26 Mar 2008 18:48:59 -0000	1.206
--- src/pl/plpgsql/src/pl_exec.c	27 Mar 2008 13:16:24 -0000
*************** exec_eval_simple_expr(PLpgSQL_execstate 
*** 4241,4247 ****
  		if (!estate->readonly_func)
  		{
  			CommandCounterIncrement();
! 			ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  		}
  
  		/*
--- 4241,4247 ----
  		if (!estate->readonly_func)
  		{
  			CommandCounterIncrement();
! 			ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  		}
  
  		/*
*************** exec_eval_simple_expr(PLpgSQL_execstate 
*** 4252,4257 ****
--- 4252,4260 ----
  							   isNull,
  							   NULL);
  		MemoryContextSwitchTo(oldcontext);
+ 
+ 		if (!estate->readonly_func)
+ 			UnregisterSnapshot(ActiveSnapshot);
  	}
  	PG_CATCH();
  	{
-- 
Sent via pgsql-patches mailing list (pgsql-patches@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-patches

Reply via email to