This is the patch I am currently playing with, to keep track of the live snapshots within a transaction.
There are some obvious bugs left, and it open certain questions. The most obvious bug is that it is not tracking SerializableSnapshot in a serializable transaction. The main question I currently have is how to deal with ActiveSnapshot. Currently, on most places it is just being registered just like any snapshot, and then unregistered. For the most part this works fine, but I wonder if it's not just a bug waiting to happen -- since it is a global variable and other places in the code feel free to assign to it, it is possible that some code sets it, then calls some other code that sets it to a different snapshot and then return -- on exit, the first setter would try to unregister a snapshot that it didn't register. So the question is, should we deal with ActiveSnapshot in some different way? For example snapmgr could know explicitely about it; keeping a stack perhaps, and have callers call a routine in snapmgr "I need this snap to be ActiveSnapshot" (push), and later "I no longer need ActiveSnapshot" (pop). On error, we can just forget about the part of the stack that corresponds to the current subxact. This would allow us to remove some PG_TRY blocks. The other question is about CommitTransactionCommand. Currently my EOXact routine barfs for every snapshot not unregistered on main transaction commit -- a leak. I see this as a good thing, however it forced me to be more meticulous about not having ActiveSnapshot be set in commands that have multiple transactions like VACUUM, multitable CLUSTER and CREATE INDEX CONCURRENTLY. (Hmm, maybe this last point is also saying that we should be dealing with ActiveSnapshot explicitely on snapmgr.) And finally, the last question is about some callers like COPY or EXPLAIN that get a snapshot (ActiveSnapshot or a different one), then modify Snapshot->curcid. I'm not sure this is a problem, really, but just to be careful I made them create a copy and register the copy. The problem I have with it is that the snapshots are allocated only once when they are registered -- if somebody registers the same snapshot in the future, we will only increment the reference count and return the pointer to the previously stored snapshot. So if COPY reuses a snapshot that has been previously stored, it would mangle the copy and the other user could get an invalid view of the database. I'm not sure this is really a problem, because how could COPY get the same snapshot as anybody else? Still, it gives me a bad feeling. (Hmm ... it just occured to me that perhaps it would be better to improve the interaction between GetSnapshotData and RegisterSnapshot, to avoid some palloc traffic.) -- Alvaro Herrera http://www.CommandPrompt.com/ PostgreSQL Replication, Consulting, Custom Development, 24x7 support
Index: src/backend/access/transam/xact.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/xact.c,v retrieving revision 1.262 diff -c -p -r1.262 xact.c *** src/backend/access/transam/xact.c 26 Mar 2008 18:48:59 -0000 1.262 --- src/backend/access/transam/xact.c 27 Mar 2008 13:16:15 -0000 *************** CommitTransaction(void) *** 1752,1757 **** --- 1752,1758 ---- AtEOXact_ComboCid(); AtEOXact_HashTables(true); AtEOXact_PgStat(true); + AtEOXact_Snapshot(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; *************** PrepareTransaction(void) *** 1984,1989 **** --- 1985,1991 ---- AtEOXact_ComboCid(); AtEOXact_HashTables(true); /* don't call AtEOXact_PgStat here */ + AtEOXact_Snapshot(true); CurrentResourceOwner = NULL; ResourceOwnerDelete(TopTransactionResourceOwner); *************** AbortTransaction(void) *** 2128,2133 **** --- 2130,2136 ---- AtEOXact_ComboCid(); AtEOXact_HashTables(false); AtEOXact_PgStat(false); + AtEOXact_Snapshot(false); pgstat_report_xact_timestamp(0); /* *************** CommitSubTransaction(void) *** 3806,3811 **** --- 3809,3815 ---- AtSubCommit_Notify(); AtEOSubXact_UpdateFlatFiles(true, s->subTransactionId, s->parent->subTransactionId); + AtSubCommit_Snapshot(); CallSubXactCallbacks(SUBXACT_EVENT_COMMIT_SUB, s->subTransactionId, s->parent->subTransactionId); *************** AbortSubTransaction(void) *** 3926,3931 **** --- 3930,3936 ---- AtSubAbort_Notify(); AtEOSubXact_UpdateFlatFiles(false, s->subTransactionId, s->parent->subTransactionId); + AtSubAbort_Snapshot(); /* Advertise the fact that we aborted in pg_clog. */ (void) RecordTransactionAbort(true); Index: src/backend/catalog/index.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/catalog/index.c,v retrieving revision 1.296 diff -c -p -r1.296 index.c *** src/backend/catalog/index.c 26 Mar 2008 21:10:37 -0000 1.296 --- src/backend/catalog/index.c 27 Mar 2008 13:16:15 -0000 *************** IndexBuildHeapScan(Relation heapRelation *** 1466,1471 **** --- 1466,1472 ---- TransactionId OldestXmin; BlockNumber root_blkno = InvalidBlockNumber; OffsetNumber root_offsets[MaxHeapTuplesPerPage]; + bool unreg_snapshot = false; /* * sanity checks *************** IndexBuildHeapScan(Relation heapRelation *** 1502,1508 **** } else if (indexInfo->ii_Concurrent) { ! snapshot = CopySnapshot(GetTransactionSnapshot()); OldestXmin = InvalidTransactionId; /* not used */ } else --- 1503,1510 ---- } else if (indexInfo->ii_Concurrent) { ! snapshot = RegisterSnapshot(GetTransactionSnapshot()); ! unreg_snapshot = true; OldestXmin = InvalidTransactionId; /* not used */ } else *************** IndexBuildHeapScan(Relation heapRelation *** 1787,1792 **** --- 1789,1796 ---- } heap_endscan(scan); + if (unreg_snapshot) + UnregisterSnapshot(snapshot); ExecDropSingleTupleTableSlot(slot); Index: src/backend/commands/cluster.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.172 diff -c -p -r1.172 cluster.c *** src/backend/commands/cluster.c 26 Mar 2008 21:10:37 -0000 1.172 --- src/backend/commands/cluster.c 27 Mar 2008 13:16:15 -0000 *************** cluster(ClusterStmt *stmt, bool isTopLev *** 211,216 **** --- 211,217 ---- rvs = get_tables_to_cluster(cluster_context); /* Commit to get out of starting transaction */ + UnregisterSnapshot(ActiveSnapshot); CommitTransactionCommand(); /* Ok, now that we've got them all, cluster them one by one */ *************** cluster(ClusterStmt *stmt, bool isTopLev *** 221,228 **** /* Start a new transaction for each relation. */ StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); cluster_rel(rvtc, true); CommitTransactionCommand(); } --- 222,230 ---- /* Start a new transaction for each relation. */ StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); cluster_rel(rvtc, true); + UnregisterSnapshot(ActiveSnapshot); CommitTransactionCommand(); } Index: src/backend/commands/copy.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/copy.c,v retrieving revision 1.298 diff -c -p -r1.298 copy.c *** src/backend/commands/copy.c 26 Mar 2008 18:48:59 -0000 1.298 --- src/backend/commands/copy.c 27 Mar 2008 13:16:15 -0000 *************** DoCopy(const CopyStmt *stmt, const char *** 1003,1008 **** --- 1003,1009 ---- Query *query; PlannedStmt *plan; DestReceiver *dest; + Snapshot snap; Assert(!is_from); cstate->rel = NULL; *************** DoCopy(const CopyStmt *stmt, const char *** 1044,1056 **** plan = planner(query, 0, NULL); /* ! * Update snapshot command ID to ensure this query sees results of any ! * previously executed queries. (It's a bit cheesy to modify ! * ActiveSnapshot without making a copy, but for the limited ways in ! * which COPY can be invoked, I think it's OK, because the active ! * snapshot shouldn't be shared with anything else anyway.) */ ! ActiveSnapshot->curcid = GetCurrentCommandId(false); /* Create dest receiver for COPY OUT */ dest = CreateDestReceiver(DestCopyOut, NULL); --- 1045,1056 ---- plan = planner(query, 0, NULL); /* ! * Use a snapshot with updated command ID to ensure this query sees ! * results of any previously executed queries. We don't register ! * the snapshot here -- CreateQueryDesc does it. */ ! snap = CopySnapshot(ActiveSnapshot); ! snap->curcid = GetCurrentCommandId(false); /* Create dest receiver for COPY OUT */ dest = CreateDestReceiver(DestCopyOut, NULL); *************** DoCopy(const CopyStmt *stmt, const char *** 1058,1065 **** /* Create a QueryDesc requesting no output */ cstate->queryDesc = CreateQueryDesc(plan, ! ActiveSnapshot, InvalidSnapshot, dest, NULL, false); /* * Call ExecutorStart to prepare the plan for execution. --- 1058,1066 ---- /* Create a QueryDesc requesting no output */ cstate->queryDesc = CreateQueryDesc(plan, ! snap, InvalidSnapshot, dest, NULL, false); + FreeSnapshot(snap); /* * Call ExecutorStart to prepare the plan for execution. Index: src/backend/commands/explain.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/explain.c,v retrieving revision 1.171 diff -c -p -r1.171 explain.c *** src/backend/commands/explain.c 26 Mar 2008 18:48:59 -0000 1.171 --- src/backend/commands/explain.c 27 Mar 2008 13:17:43 -0000 *************** ExplainOnePlan(PlannedStmt *plannedstmt, *** 226,246 **** ExplainState *es; StringInfoData buf; int eflags; /* ! * Update snapshot command ID to ensure this query sees results of any ! * previously executed queries. (It's a bit cheesy to modify ! * ActiveSnapshot without making a copy, but for the limited ways in which ! * EXPLAIN can be invoked, I think it's OK, because the active snapshot ! * shouldn't be shared with anything else anyway.) */ ! ActiveSnapshot->curcid = GetCurrentCommandId(false); /* Create a QueryDesc requesting no output */ queryDesc = CreateQueryDesc(plannedstmt, ! ActiveSnapshot, InvalidSnapshot, None_Receiver, params, stmt->analyze); INSTR_TIME_SET_CURRENT(starttime); --- 226,246 ---- ExplainState *es; StringInfoData buf; int eflags; + Snapshot snap; /* ! * Update a snapshot with an updated command ID to ensure this query sees ! * results of any previously executed queries. */ ! snap = CopySnapshot(ActiveSnapshot); ! snap->curcid = GetCurrentCommandId(false); /* Create a QueryDesc requesting no output */ queryDesc = CreateQueryDesc(plannedstmt, ! snap, InvalidSnapshot, None_Receiver, params, stmt->analyze); + FreeSnapshot(snap); INSTR_TIME_SET_CURRENT(starttime); Index: src/backend/commands/indexcmds.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/indexcmds.c,v retrieving revision 1.174 diff -c -p -r1.174 indexcmds.c *** src/backend/commands/indexcmds.c 26 Mar 2008 21:10:37 -0000 1.174 --- src/backend/commands/indexcmds.c 27 Mar 2008 13:16:15 -0000 *************** DefineIndex(RangeVar *heapRelation, *** 483,488 **** --- 483,491 ---- */ LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock); + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + CommitTransactionCommand(); StartTransactionCommand(); *************** DefineIndex(RangeVar *heapRelation, *** 541,547 **** indexRelation = index_open(indexRelationId, RowExclusiveLock); /* Set ActiveSnapshot since functions in the indexes may need it */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); /* We have to re-build the IndexInfo struct, since it was lost in commit */ indexInfo = BuildIndexInfo(indexRelation); --- 544,550 ---- indexRelation = index_open(indexRelationId, RowExclusiveLock); /* Set ActiveSnapshot since functions in the indexes may need it */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); /* We have to re-build the IndexInfo struct, since it was lost in commit */ indexInfo = BuildIndexInfo(indexRelation); *************** DefineIndex(RangeVar *heapRelation, *** 580,585 **** --- 583,592 ---- heap_close(pg_index, RowExclusiveLock); + /* we can do away with our snapshot */ + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + /* * Commit this transaction to make the indisready update visible. */ *************** DefineIndex(RangeVar *heapRelation, *** 615,621 **** * We also set ActiveSnapshot to this snap, since functions in indexes may * need a snapshot. */ ! snapshot = CopySnapshot(GetTransactionSnapshot()); ActiveSnapshot = snapshot; /* --- 622,628 ---- * We also set ActiveSnapshot to this snap, since functions in indexes may * need a snapshot. */ ! snapshot = RegisterSnapshot(GetTransactionSnapshot()); ActiveSnapshot = snapshot; /* *************** DefineIndex(RangeVar *heapRelation, *** 624,629 **** --- 631,642 ---- validate_index(relationId, indexRelationId, snapshot); /* + * Note: we no longer need the validating snapshot here anymore, but we + * still need ActiveSnapshot, so we'll call UnregisterSnapshot once at the + * end. + */ + + /* * The index is now valid in the sense that it contains all currently * interesting tuples. But since it might not contain tuples deleted just * before the reference snap was taken, we have to wait out any *************** DefineIndex(RangeVar *heapRelation, *** 685,690 **** --- 698,707 ---- */ CacheInvalidateRelcacheByRelid(heaprelid.relId); + /* we can now unregister ActiveSnapshot */ + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + /* * Last thing to do is release the session-level lock on the parent table. */ *************** ReindexDatabase(const char *databaseName *** 1452,1457 **** --- 1469,1476 ---- heap_close(relationRelation, AccessShareLock); /* Now reindex each rel in a separate transaction */ + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; CommitTransactionCommand(); foreach(l, relids) { *************** ReindexDatabase(const char *databaseName *** 1459,1469 **** StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); if (reindex_relation(relid, true)) ereport(NOTICE, (errmsg("table \"%s\" was reindexed", get_rel_name(relid)))); CommitTransactionCommand(); } StartTransactionCommand(); --- 1478,1490 ---- StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); if (reindex_relation(relid, true)) ereport(NOTICE, (errmsg("table \"%s\" was reindexed", get_rel_name(relid)))); + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; CommitTransactionCommand(); } StartTransactionCommand(); Index: src/backend/commands/trigger.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/trigger.c,v retrieving revision 1.230 diff -c -p -r1.230 trigger.c *** src/backend/commands/trigger.c 26 Mar 2008 21:10:38 -0000 1.230 --- src/backend/commands/trigger.c 27 Mar 2008 13:16:15 -0000 *************** void *** 2884,2889 **** --- 2884,2890 ---- AfterTriggerFireDeferred(void) { AfterTriggerEventList *events; + bool unreg_snapshot = false; /* Must be inside a transaction */ Assert(afterTriggers != NULL); *************** AfterTriggerFireDeferred(void) *** 2898,2904 **** */ events = &afterTriggers->events; if (events->head != NULL) ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); /* * Run all the remaining triggers. Loop until they are all gone, in case --- 2899,2908 ---- */ events = &afterTriggers->events; if (events->head != NULL) ! { ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); ! unreg_snapshot = true; ! } /* * Run all the remaining triggers. Loop until they are all gone, in case *************** AfterTriggerFireDeferred(void) *** 2911,2916 **** --- 2915,2926 ---- afterTriggerInvokeEvents(events, firing_id, NULL, true); } + if (unreg_snapshot) + { + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + } + Assert(events->head == NULL); } Index: src/backend/commands/vacuum.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v retrieving revision 1.371 diff -c -p -r1.371 vacuum.c *** src/backend/commands/vacuum.c 26 Mar 2008 21:10:38 -0000 1.371 --- src/backend/commands/vacuum.c 27 Mar 2008 13:16:15 -0000 *************** vacuum(VacuumStmt *vacstmt, List *relids *** 407,412 **** --- 407,418 ---- */ if (use_own_xacts) { + /* ActiveSnapshot is not set by autovacuum */ + if (ActiveSnapshot) + { + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + } /* matches the StartTransaction in PostgresMain() */ CommitTransactionCommand(); } *************** vacuum(VacuumStmt *vacstmt, List *relids *** 444,450 **** { StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); } else old_context = MemoryContextSwitchTo(anl_context); --- 450,456 ---- { StartTransactionCommand(); /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); } else old_context = MemoryContextSwitchTo(anl_context); *************** vacuum(VacuumStmt *vacstmt, List *relids *** 452,458 **** --- 458,468 ---- analyze_rel(relid, vacstmt, vac_strategy); if (use_own_xacts) + { + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; CommitTransactionCommand(); + } else { MemoryContextSwitchTo(old_context); *************** vacuum_rel(Oid relid, VacuumStmt *vacstm *** 979,985 **** if (vacstmt->full) { /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); } else { --- 989,995 ---- if (vacstmt->full) { /* functions in indexes may want a snapshot set */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); } else { *************** vacuum_rel(Oid relid, VacuumStmt *vacstm *** 1138,1143 **** --- 1148,1160 ---- /* all done with this class, but hold lock until commit */ relation_close(onerel, NoLock); + /* do away with our snapshot */ + if (vacstmt->full) + { + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + } + /* * Complete the transaction and free all temporary memory used. */ Index: src/backend/executor/execMain.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/execMain.c,v retrieving revision 1.304 diff -c -p -r1.304 execMain.c *** src/backend/executor/execMain.c 26 Mar 2008 21:10:38 -0000 1.304 --- src/backend/executor/execMain.c 27 Mar 2008 14:19:05 -0000 *************** *** 52,57 **** --- 52,58 ---- #include "utils/acl.h" #include "utils/lsyscache.h" #include "utils/memutils.h" + #include "utils/snapmgr.h" #include "utils/tqual.h" *************** ExecutorStart(QueryDesc *queryDesc, int *** 188,195 **** /* * Copy other important information into the EState */ ! estate->es_snapshot = queryDesc->snapshot; ! estate->es_crosscheck_snapshot = queryDesc->crosscheck_snapshot; estate->es_instrument = queryDesc->doInstrument; /* --- 189,196 ---- /* * Copy other important information into the EState */ ! estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot); ! estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot); estate->es_instrument = queryDesc->doInstrument; /* *************** ExecutorEnd(QueryDesc *queryDesc) *** 316,321 **** --- 317,326 ---- if (estate->es_select_into) CloseIntoRel(queryDesc); + /* do away with our snapshots */ + UnregisterSnapshot(estate->es_snapshot); + UnregisterSnapshot(estate->es_crosscheck_snapshot); + /* * Must switch out of context before destroying it */ Index: src/backend/executor/functions.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/functions.c,v retrieving revision 1.124 diff -c -p -r1.124 functions.c *** src/backend/executor/functions.c 26 Mar 2008 18:48:59 -0000 1.124 --- src/backend/executor/functions.c 27 Mar 2008 13:16:15 -0000 *************** postquel_start(execution_state *es, SQLF *** 295,309 **** * In a read-only function, use the surrounding query's snapshot; * otherwise take a new snapshot for each query. The snapshot should * include a fresh command ID so that all work to date in this transaction ! * is visible. We copy in both cases so that postquel_end can ! * unconditionally do FreeSnapshot. */ if (fcache->readonly_func) ! snapshot = CopySnapshot(ActiveSnapshot); else { CommandCounterIncrement(); ! snapshot = CopySnapshot(GetTransactionSnapshot()); } if (IsA(es->stmt, PlannedStmt)) --- 295,309 ---- * In a read-only function, use the surrounding query's snapshot; * otherwise take a new snapshot for each query. The snapshot should * include a fresh command ID so that all work to date in this transaction ! * is visible. No need to create a separate copy; CreateQueryDesc will ! * register it appropriately. */ if (fcache->readonly_func) ! snapshot = ActiveSnapshot; else { CommandCounterIncrement(); ! snapshot = GetTransactionSnapshot(); } if (IsA(es->stmt, PlannedStmt)) *************** postquel_end(execution_state *es) *** 425,431 **** ActiveSnapshot = saveActiveSnapshot; } - FreeSnapshot(es->qd->snapshot); FreeQueryDesc(es->qd); es->qd = NULL; } --- 425,430 ---- Index: src/backend/executor/spi.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/spi.c,v retrieving revision 1.191 diff -c -p -r1.191 spi.c *** src/backend/executor/spi.c 26 Mar 2008 18:48:59 -0000 1.191 --- src/backend/executor/spi.c 27 Mar 2008 13:16:15 -0000 *************** SPI_execp(SPIPlanPtr plan, Datum *Values *** 365,373 **** /* * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow ! * the caller to specify exactly which snapshots to use. Also, the caller ! * may specify that AFTER triggers should be queued as part of the outer ! * query rather than being fired immediately at the end of the command. * * This is currently not documented in spi.sgml because it is only intended * for use by RI triggers. --- 365,374 ---- /* * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow ! * the caller to specify exactly which snapshots to use, which will be ! * registered here. Also, the caller may specify that AFTER triggers should be ! * queued as part of the outer query rather than being fired immediately at the ! * end of the command. * * This is currently not documented in spi.sgml because it is only intended * for use by RI triggers. *************** SPI_cursor_open(const char *name, SPIPla *** 1028,1034 **** } /* ! * Set up the snapshot to use. (PortalStart will do CopySnapshot, so we * skip that here.) */ if (read_only) --- 1029,1035 ---- } /* ! * Set up the snapshot to use. (PortalStart will do RegisterSnapshot, so we * skip that here.) */ if (read_only) *************** _SPI_execute_plan(SPIPlanPtr plan, Datum *** 1598,1606 **** * ActiveSnapshot; if read-write, grab a full new snap. */ if (read_only) ! ActiveSnapshot = CopySnapshot(saveActiveSnapshot); else ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); } else { --- 1599,1607 ---- * ActiveSnapshot; if read-write, grab a full new snap. */ if (read_only) ! ActiveSnapshot = RegisterSnapshot(saveActiveSnapshot); else ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); } else { *************** _SPI_execute_plan(SPIPlanPtr plan, Datum *** 1609,1617 **** * exactly that snapshot, but read-write means use the * snap with advancing of command ID. */ ! ActiveSnapshot = CopySnapshot(snapshot); ! if (!read_only) ! ActiveSnapshot->curcid = GetCurrentCommandId(false); } if (IsA(stmt, PlannedStmt) && --- 1610,1626 ---- * exactly that snapshot, but read-write means use the * snap with advancing of command ID. */ ! if (read_only) ! ActiveSnapshot = RegisterSnapshot(snapshot); ! else ! { ! Snapshot snap; ! ! snap = CopySnapshot(snapshot); ! snap->curcid = GetCurrentCommandId(false); ! ActiveSnapshot = RegisterSnapshot(snap); ! FreeSnapshot(snap); ! } } if (IsA(stmt, PlannedStmt) && *************** _SPI_execute_plan(SPIPlanPtr plan, Datum *** 1641,1647 **** _SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free; res = SPI_OK_UTILITY; } ! FreeSnapshot(ActiveSnapshot); ActiveSnapshot = NULL; /* --- 1650,1656 ---- _SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free; res = SPI_OK_UTILITY; } ! UnregisterSnapshot(ActiveSnapshot); ActiveSnapshot = NULL; /* Index: src/backend/storage/ipc/procarray.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v retrieving revision 1.43 diff -c -p -r1.43 procarray.c *** src/backend/storage/ipc/procarray.c 26 Mar 2008 18:48:59 -0000 1.43 --- src/backend/storage/ipc/procarray.c 28 Mar 2008 13:42:55 -0000 *************** GetSnapshotData(Snapshot snapshot, bool *** 681,691 **** Assert(snapshot != NULL); - /* Serializable snapshot must be computed before any other... */ - Assert(serializable ? - !TransactionIdIsValid(MyProc->xmin) : - TransactionIdIsValid(MyProc->xmin)); - /* * Allocating space for maxProcs xids is usually overkill; numProcs would * be sufficient. But it seems better to do the malloc while not holding --- 681,686 ---- Index: src/backend/storage/large_object/inv_api.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/large_object/inv_api.c,v retrieving revision 1.131 diff -c -p -r1.131 inv_api.c *** src/backend/storage/large_object/inv_api.c 26 Mar 2008 21:10:38 -0000 1.131 --- src/backend/storage/large_object/inv_api.c 27 Mar 2008 13:16:16 -0000 *************** inv_open(Oid lobjId, int flags, MemoryCo *** 245,256 **** } else if (flags & INV_READ) { ! /* be sure to copy snap into mcxt */ ! MemoryContext oldContext = MemoryContextSwitchTo(mcxt); ! ! retval->snapshot = CopySnapshot(ActiveSnapshot); retval->flags = IFS_RDLOCK; - MemoryContextSwitchTo(oldContext); } else elog(ERROR, "invalid flags: %d", flags); --- 245,252 ---- } else if (flags & INV_READ) { ! retval->snapshot = RegisterSnapshot(ActiveSnapshot); retval->flags = IFS_RDLOCK; } else elog(ERROR, "invalid flags: %d", flags); *************** inv_close(LargeObjectDesc *obj_desc) *** 273,279 **** { Assert(PointerIsValid(obj_desc)); if (obj_desc->snapshot != SnapshotNow) ! FreeSnapshot(obj_desc->snapshot); pfree(obj_desc); } --- 269,275 ---- { Assert(PointerIsValid(obj_desc)); if (obj_desc->snapshot != SnapshotNow) ! UnregisterSnapshot(obj_desc->snapshot); pfree(obj_desc); } Index: src/backend/tcop/fastpath.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/fastpath.c,v retrieving revision 1.99 diff -c -p -r1.99 fastpath.c *** src/backend/tcop/fastpath.c 26 Mar 2008 18:48:59 -0000 1.99 --- src/backend/tcop/fastpath.c 27 Mar 2008 13:16:16 -0000 *************** HandleFunctionRequest(StringInfo msgBuf) *** 309,315 **** * Now that we know we are in a valid transaction, set snapshot in case * needed by function itself or one of the datatype I/O routines. */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); /* * Begin parsing the buffer contents. --- 309,315 ---- * Now that we know we are in a valid transaction, set snapshot in case * needed by function itself or one of the datatype I/O routines. */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); /* * Begin parsing the buffer contents. *************** HandleFunctionRequest(StringInfo msgBuf) *** 396,401 **** --- 396,405 ---- SendFunctionResult(retval, fcinfo.isnull, fip->rettype, rformat); + /* We no longer need the snapshot */ + UnregisterSnapshot(ActiveSnapshot); + ActiveSnapshot = NULL; + /* * Emit duration logging if appropriate. */ Index: src/backend/tcop/postgres.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/postgres.c,v retrieving revision 1.547 diff -c -p -r1.547 postgres.c *** src/backend/tcop/postgres.c 26 Mar 2008 18:48:59 -0000 1.547 --- src/backend/tcop/postgres.c 27 Mar 2008 13:16:16 -0000 *************** pg_plan_queries(List *querytrees, int cu *** 754,760 **** { if (needSnapshot && mySnapshot == NULL) { ! mySnapshot = CopySnapshot(GetTransactionSnapshot()); ActiveSnapshot = mySnapshot; } stmt = (Node *) pg_plan_query(query, cursorOptions, --- 754,760 ---- { if (needSnapshot && mySnapshot == NULL) { ! mySnapshot = RegisterSnapshot(GetTransactionSnapshot()); ActiveSnapshot = mySnapshot; } stmt = (Node *) pg_plan_query(query, cursorOptions, *************** pg_plan_queries(List *querytrees, int cu *** 765,771 **** } if (mySnapshot) ! FreeSnapshot(mySnapshot); } PG_CATCH(); { --- 765,771 ---- } if (mySnapshot) ! UnregisterSnapshot(mySnapshot); } PG_CATCH(); { Index: src/backend/tcop/pquery.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/pquery.c,v retrieving revision 1.122 diff -c -p -r1.122 pquery.c *** src/backend/tcop/pquery.c 26 Mar 2008 18:48:59 -0000 1.122 --- src/backend/tcop/pquery.c 27 Mar 2008 13:16:16 -0000 *************** CreateQueryDesc(PlannedStmt *plannedstmt *** 70,77 **** qd->operation = plannedstmt->commandType; /* operation */ qd->plannedstmt = plannedstmt; /* plan */ qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */ ! qd->snapshot = snapshot; /* snapshot */ ! qd->crosscheck_snapshot = crosscheck_snapshot; /* RI check snapshot */ qd->dest = dest; /* output dest */ qd->params = params; /* parameter values passed into query */ qd->doInstrument = doInstrument; /* instrumentation wanted? */ --- 70,78 ---- qd->operation = plannedstmt->commandType; /* operation */ qd->plannedstmt = plannedstmt; /* plan */ qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */ ! qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */ ! /* RI check snapshot */ ! qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot); qd->dest = dest; /* output dest */ qd->params = params; /* parameter values passed into query */ qd->doInstrument = doInstrument; /* instrumentation wanted? */ *************** CreateUtilityQueryDesc(Node *utilitystmt *** 98,104 **** qd->operation = CMD_UTILITY; /* operation */ qd->plannedstmt = NULL; qd->utilitystmt = utilitystmt; /* utility command */ ! qd->snapshot = snapshot; /* snapshot */ qd->crosscheck_snapshot = InvalidSnapshot; /* RI check snapshot */ qd->dest = dest; /* output dest */ qd->params = params; /* parameter values passed into query */ --- 99,105 ---- qd->operation = CMD_UTILITY; /* operation */ qd->plannedstmt = NULL; qd->utilitystmt = utilitystmt; /* utility command */ ! qd->snapshot = RegisterSnapshot(snapshot); /* snapshot */ qd->crosscheck_snapshot = InvalidSnapshot; /* RI check snapshot */ qd->dest = dest; /* output dest */ qd->params = params; /* parameter values passed into query */ *************** CreateUtilityQueryDesc(Node *utilitystmt *** 118,123 **** --- 119,127 ---- void FreeQueryDesc(QueryDesc *qdesc) { + UnregisterSnapshot(qdesc->snapshot); + UnregisterSnapshot(qdesc->crosscheck_snapshot); + /* Can't be a live query */ Assert(qdesc->estate == NULL); /* Only the QueryDesc itself need be freed */ *************** ProcessQuery(PlannedStmt *plan, *** 155,161 **** * Must always set snapshot for plannable queries. Note we assume that * caller will take care of restoring ActiveSnapshot on exit/error. */ ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); /* * Create the QueryDesc object --- 159,165 ---- * Must always set snapshot for plannable queries. Note we assume that * caller will take care of restoring ActiveSnapshot on exit/error. */ ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); /* * Create the QueryDesc object *************** ProcessQuery(PlannedStmt *plan, *** 220,230 **** * Now, we close down all the scans and free allocated resources. */ ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); - - FreeSnapshot(ActiveSnapshot); - ActiveSnapshot = NULL; } /* --- 224,232 ---- * Now, we close down all the scans and free allocated resources. */ ExecutorEnd(queryDesc); + UnregisterSnapshot(ActiveSnapshot); FreeQueryDesc(queryDesc); } /* *************** PortalStart(Portal portal, ParamListInfo *** 488,500 **** case PORTAL_ONE_SELECT: /* ! * Must set snapshot before starting executor. Be sure to ! * copy it into the portal's context. */ if (snapshot) ! ActiveSnapshot = CopySnapshot(snapshot); else ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); /* * Create QueryDesc in portal's context; for the moment, set --- 490,502 ---- case PORTAL_ONE_SELECT: /* ! * Must set snapshot before starting executor. Note: we don't ! * register them here; they're registered in ExecutorStart. */ if (snapshot) ! ActiveSnapshot = snapshot; else ! ActiveSnapshot = GetTransactionSnapshot(); /* * Create QueryDesc in portal's context; for the moment, set *************** PortalRunUtility(Portal portal, Node *ut *** 1167,1173 **** IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || IsA(utilityStmt, CheckPointStmt))) ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); else ActiveSnapshot = NULL; --- 1169,1175 ---- IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || IsA(utilityStmt, CheckPointStmt))) ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); else ActiveSnapshot = NULL; *************** PortalRunUtility(Portal portal, Node *ut *** 1182,1188 **** MemoryContextSwitchTo(PortalGetHeapMemory(portal)); if (ActiveSnapshot) ! FreeSnapshot(ActiveSnapshot); ActiveSnapshot = NULL; } --- 1184,1190 ---- MemoryContextSwitchTo(PortalGetHeapMemory(portal)); if (ActiveSnapshot) ! UnregisterSnapshot(ActiveSnapshot); ActiveSnapshot = NULL; } Index: src/backend/utils/adt/ri_triggers.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/adt/ri_triggers.c,v retrieving revision 1.107 diff -c -p -r1.107 ri_triggers.c *** src/backend/utils/adt/ri_triggers.c 26 Mar 2008 21:10:39 -0000 1.107 --- src/backend/utils/adt/ri_triggers.c 27 Mar 2008 13:16:16 -0000 *************** RI_Initial_Check(Trigger *trigger, Relat *** 2756,2767 **** /* * Run the plan. For safety we force a current snapshot to be used. (In * serializable mode, this arguably violates serializability, but we ! * really haven't got much choice.) We need at most one tuple returned, ! * so pass limit = 1. */ spi_result = SPI_execute_snapshot(qplan, NULL, NULL, ! CopySnapshot(GetLatestSnapshot()), InvalidSnapshot, true, false, 1); --- 2756,2768 ---- /* * Run the plan. For safety we force a current snapshot to be used. (In * serializable mode, this arguably violates serializability, but we ! * really haven't got much choice.) We don't need to register the ! * snapshot, because SPI_execute_snapshot will see to it. We need at most ! * one tuple returned, so pass limit = 1. */ spi_result = SPI_execute_snapshot(qplan, NULL, NULL, ! GetLatestSnapshot(), InvalidSnapshot, true, false, 1); *************** ri_PerformCheck(RI_QueryKey *qkey, SPIPl *** 3311,3323 **** * caller passes detectNewRows == false then it's okay to do the query * with the transaction snapshot; otherwise we use a current snapshot, and * tell the executor to error out if it finds any rows under the current ! * snapshot that wouldn't be visible per the transaction snapshot. */ if (IsXactIsoLevelSerializable && detectNewRows) { CommandCounterIncrement(); /* be sure all my own work is visible */ ! test_snapshot = CopySnapshot(GetLatestSnapshot()); ! crosscheck_snapshot = CopySnapshot(GetTransactionSnapshot()); } else { --- 3312,3326 ---- * caller passes detectNewRows == false then it's okay to do the query * with the transaction snapshot; otherwise we use a current snapshot, and * tell the executor to error out if it finds any rows under the current ! * snapshot that wouldn't be visible per the transaction snapshot. Note ! * that SPI_execute_snapshot will register the snapshots, so we don't need ! * to bother here. */ if (IsXactIsoLevelSerializable && detectNewRows) { CommandCounterIncrement(); /* be sure all my own work is visible */ ! test_snapshot = GetLatestSnapshot(); ! crosscheck_snapshot = GetTransactionSnapshot(); } else { Index: src/backend/utils/time/snapmgr.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/time/snapmgr.c,v retrieving revision 1.1 diff -c -p -r1.1 snapmgr.c *** src/backend/utils/time/snapmgr.c 26 Mar 2008 18:48:59 -0000 1.1 --- src/backend/utils/time/snapmgr.c 28 Mar 2008 13:43:43 -0000 *************** *** 14,20 **** --- 14,22 ---- #include "access/xact.h" #include "access/transam.h" + #include "storage/proc.h" #include "storage/procarray.h" + #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/tqual.h" *************** FreeXactSnapshot(void) *** 170,172 **** --- 172,372 ---- LatestSnapshot = NULL; ActiveSnapshot = NULL; /* just for cleanliness */ } + + /* + * Struct to keep track of the registered snapshots + */ + typedef struct SnapshotElt + { + Snapshot s_snap; + uint32 s_refcount; + uint32 s_level; + struct SnapshotElt *s_next; + } SnapshotElt; + + /* + * Head of the list of registered snapshots, and memory context where it + * resides + */ + static SnapshotElt *SnapshotHead = NULL; + static MemoryContext SnapshotCxt = NULL; + + /* + * RegisterSnapshot + * Register a snapshot as being in use + * + * This registers a snapshot on our list. If the snapshot has been registered + * previously, increment the reference count. + * + * If InvalidSnapshot or a non-MVCC snapshot is passed, it is not registered. + * XXX -- should it be a hard error if a non-MVCC snapshot is passed? + */ + Snapshot + RegisterSnapshot(Snapshot snapshot) + { + SnapshotElt *elt; + SnapshotElt *newhead; + MemoryContext oldcxt; + + if (snapshot == InvalidSnapshot) + return InvalidSnapshot; + + if (SnapshotCxt == NULL) + { + SnapshotCxt = AllocSetContextCreate(TopTransactionContext, + "Snapshot Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } + + for (elt = SnapshotHead; elt != NULL; elt = elt->s_next) + { + if (elt->s_snap == snapshot) + { + elt->s_refcount++; + return elt->s_snap; + } + } + + oldcxt = MemoryContextSwitchTo(SnapshotCxt); + newhead = palloc(sizeof(SnapshotElt)); + newhead->s_next = SnapshotHead; + newhead->s_snap = CopySnapshot(snapshot); + newhead->s_level = GetCurrentTransactionNestLevel(); + newhead->s_refcount = 1; + MemoryContextSwitchTo(oldcxt); + + SnapshotHead = newhead; + + return SnapshotHead->s_snap; + } + + /* + * UnregisterSnapshot + * Signals that a snapshot is no longer necessary + * + * If the reference count falls to zero, the memory is released. + */ + void + UnregisterSnapshot(Snapshot snapshot) + { + SnapshotElt *prev = NULL; + SnapshotElt *elt; + bool found = false; + + if (snapshot == InvalidSnapshot) + return; + + for (elt = SnapshotHead; elt != NULL; elt = elt->s_next) + { + if (elt->s_snap == snapshot) + { + elt->s_refcount--; + found = true; + + if (elt->s_refcount == 0) + { + if (prev) + prev->s_next = elt->s_next; + else + SnapshotHead = elt->s_next; + + elog(LOG, "found: refcount is now 0"); + + pfree(elt); + } + break; + } + + prev = elt; + } + + if (!found) + elog(WARNING, "unregistering failed for snapshot %p", snapshot); + } + + /* + * At subtransaction abort, unregister all snapshots registered during this + * subtransaction. + * */ + void + AtSubAbort_Snapshot(void) + { + SnapshotElt *prev = NULL; + SnapshotElt *elt; + int level = GetCurrentTransactionNestLevel(); + + for (elt = SnapshotHead; elt != NULL;) + { + if (elt->s_level == level) + { + SnapshotElt *tofree; + + if (prev) + prev->s_next = elt->s_next; + else + SnapshotHead = elt->s_next; + + tofree = elt; + elt = elt->s_next; + pfree(tofree); + } + else + { + prev = elt; + elt = elt->s_next; + } + } + } + + /* + * At subtransaction commit, reassign all snapshots to the parent subxact. + */ + void + AtSubCommit_Snapshot(void) + { + SnapshotElt *elt; + int level = GetCurrentTransactionNestLevel(); + + for (elt = SnapshotHead; elt != NULL; elt = elt->s_next) + { + if (elt->s_level == level) + elt->s_level--; + } + } + + /* + * AtEOXact_Snapshot + * Main transaction end handler for snapshots. + * + * At xact commit, we want to complain about any leftover snapshots, as a + * measure against forgotten snapshot cleanup calls. On xact abort, it is + * expected that some snapshots are left behind; just cleanup our module. + */ + void + AtEOXact_Snapshot(bool is_commit) + { + if (SnapshotCxt == NULL) + return; + + if (is_commit) + { + SnapshotElt *elt; + + /* complain about any unregistered snapshot */ + for (elt = SnapshotHead; elt != NULL; elt = elt->s_next) + { + ereport(WARNING, + (errmsg("snapshot %p not destroyed at commit (%d refs)", + elt->s_snap, elt->s_refcount))); + } + } + + /* + * And forget about them. We don't need to reset the context -- it'll + * go away with TopTransactionContext. + */ + SnapshotCxt = NULL; + SnapshotHead = NULL; + } Index: src/include/utils/snapmgr.h =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/include/utils/snapmgr.h,v retrieving revision 1.1 diff -c -p -r1.1 snapmgr.h *** src/include/utils/snapmgr.h 26 Mar 2008 18:48:59 -0000 1.1 --- src/include/utils/snapmgr.h 27 Mar 2008 13:46:58 -0000 *************** extern Snapshot CopySnapshot(Snapshot sn *** 30,33 **** --- 30,41 ---- extern void FreeSnapshot(Snapshot snapshot); extern void FreeXactSnapshot(void); + extern Snapshot RegisterSnapshot(Snapshot snapshot); + extern void UnregisterSnapshot(Snapshot snapshot); + extern void AtSubAbort_Snapshot(void); + extern void AtSubCommit_Snapshot(void); + extern void AtEOXact_Snapshot(bool is_commit); + extern void AtPrepare_Snapshot(void); + extern void PostPrepare_Snapshot(void); + #endif /* SNAPMGR_H */ Index: src/pl/plpgsql/src/pl_exec.c =================================================================== RCS file: /home/alvherre/Code/cvs/pgsql/src/pl/plpgsql/src/pl_exec.c,v retrieving revision 1.206 diff -c -p -r1.206 pl_exec.c *** src/pl/plpgsql/src/pl_exec.c 26 Mar 2008 18:48:59 -0000 1.206 --- src/pl/plpgsql/src/pl_exec.c 27 Mar 2008 13:16:24 -0000 *************** exec_eval_simple_expr(PLpgSQL_execstate *** 4241,4247 **** if (!estate->readonly_func) { CommandCounterIncrement(); ! ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); } /* --- 4241,4247 ---- if (!estate->readonly_func) { CommandCounterIncrement(); ! ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot()); } /* *************** exec_eval_simple_expr(PLpgSQL_execstate *** 4252,4257 **** --- 4252,4260 ---- isNull, NULL); MemoryContextSwitchTo(oldcontext); + + if (!estate->readonly_func) + UnregisterSnapshot(ActiveSnapshot); } PG_CATCH(); {
-- Sent via pgsql-patches mailing list (pgsql-patches@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-patches