Stephan Szabo <[EMAIL PROTECTED]> writes:
> On Tue, 30 Sep 2003, Tom Lane wrote:
>> I think I can implement it and it will act as stated in my proposal.
>> Whether people like the proposed behavior is the big question in my
>> mind.

> I think it's more reasonable than the current behavior or any of
> the others we've hit along the way, and we have to pretty much choose
> now if we want to change it for 7.4.

I've committed the attached patch.  One thing I wanted to double-check
with you is that the SELECT FOR UPDATES done in the noaction cases are
being correctly handled.  I think it is correct to do them with the
current snapshot rather than the start-of-transaction snap; do you
agree?  Also, I did not propagate the crosscheck support into
heap_mark4update, meaning that these SELECT FOR UPDATEs won't complain
if they find a row that was inserted later than the start of the
serializable transaction.  I'm not totally sure if they should or not;
what do you think?

                        regards, tom lane

*** src/backend/access/heap/heapam.c.orig       Thu Sep 25 10:22:54 2003
--- src/backend/access/heap/heapam.c    Wed Oct  1 16:02:27 2003
***************
*** 1207,1220 ****
   * NB: do not call this directly unless you are prepared to deal with
   * concurrent-update conditions.  Use simple_heap_delete instead.
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
   * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
!  * (the last only possible if wait == false).
   */
  int
  heap_delete(Relation relation, ItemPointer tid,
!                       ItemPointer ctid, CommandId cid, bool wait)
  {
        ItemId          lp;
        HeapTupleData tp;
--- 1207,1229 ----
   * NB: do not call this directly unless you are prepared to deal with
   * concurrent-update conditions.  Use simple_heap_delete instead.
   *
+  *    relation - table to be modified
+  *    tid - TID of tuple to be deleted
+  *    ctid - output parameter, used only for failure case (see below)
+  *    cid - delete command ID to use in verifying tuple visibility
+  *    crosscheck - if not SnapshotAny, also check tuple against this
+  *    wait - true if should wait for any conflicting update to commit/abort
+  *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
   * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
!  * (the last only possible if wait == false).  On a failure return,
!  * *ctid is set to the ctid link of the target tuple (possibly a later
!  * version of the row).
   */
  int
  heap_delete(Relation relation, ItemPointer tid,
!                       ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool 
wait)
  {
        ItemId          lp;
        HeapTupleData tp;
***************
*** 1240,1246 ****
        tp.t_tableOid = relation->rd_id;
  
  l1:
!       result = HeapTupleSatisfiesUpdate(&tp, cid);
  
        if (result == HeapTupleInvisible)
        {
--- 1249,1255 ----
        tp.t_tableOid = relation->rd_id;
  
  l1:
!       result = HeapTupleSatisfiesUpdate(tp.t_data, cid);
  
        if (result == HeapTupleInvisible)
        {
***************
*** 1278,1283 ****
--- 1287,1300 ----
                else
                        result = HeapTupleUpdated;
        }
+ 
+       if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+       {
+               /* Perform additional check for serializable RI updates */
+               if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck))
+                       result = HeapTupleUpdated;
+       }
+ 
        if (result != HeapTupleMayBeUpdated)
        {
                Assert(result == HeapTupleSelfUpdated ||
***************
*** 1378,1384 ****
  
        result = heap_delete(relation, tid,
                                                 &ctid,
!                                                GetCurrentCommandId(),
                                                 true /* wait for commit */);
        switch (result)
        {
--- 1395,1401 ----
  
        result = heap_delete(relation, tid,
                                                 &ctid,
!                                                GetCurrentCommandId(), SnapshotAny,
                                                 true /* wait for commit */);
        switch (result)
        {
***************
*** 1407,1420 ****
   * NB: do not call this directly unless you are prepared to deal with
   * concurrent-update conditions.  Use simple_heap_update instead.
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we *did* update it.  Failure return codes are
   * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
!  * (the last only possible if wait == false).
   */
  int
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
!                       ItemPointer ctid, CommandId cid, bool wait)
  {
        ItemId          lp;
        HeapTupleData oldtup;
--- 1424,1449 ----
   * NB: do not call this directly unless you are prepared to deal with
   * concurrent-update conditions.  Use simple_heap_update instead.
   *
+  *    relation - table to be modified
+  *    otid - TID of old tuple to be replaced
+  *    newtup - newly constructed tuple data to store
+  *    ctid - output parameter, used only for failure case (see below)
+  *    cid - update command ID to use in verifying old tuple visibility
+  *    crosscheck - if not SnapshotAny, also check old tuple against this
+  *    wait - true if should wait for any conflicting update to commit/abort
+  *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we *did* update it.  Failure return codes are
   * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
!  * (the last only possible if wait == false).  On a failure return,
!  * *ctid is set to the ctid link of the old tuple (possibly a later
!  * version of the row).
!  * On success, newtup->t_self is set to the TID where the new tuple
!  * was inserted.
   */
  int
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
!                       ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool 
wait)
  {
        ItemId          lp;
        HeapTupleData oldtup;
***************
*** 1450,1456 ****
         */
  
  l2:
!       result = HeapTupleSatisfiesUpdate(&oldtup, cid);
  
        if (result == HeapTupleInvisible)
        {
--- 1479,1485 ----
         */
  
  l2:
!       result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid);
  
        if (result == HeapTupleInvisible)
        {
***************
*** 1488,1493 ****
--- 1517,1530 ----
                else
                        result = HeapTupleUpdated;
        }
+ 
+       if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+       {
+               /* Perform additional check for serializable RI updates */
+               if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck))
+                       result = HeapTupleUpdated;
+       }
+ 
        if (result != HeapTupleMayBeUpdated)
        {
                Assert(result == HeapTupleSelfUpdated ||
***************
*** 1718,1724 ****
  
        result = heap_update(relation, otid, tup,
                                                 &ctid,
!                                                GetCurrentCommandId(),
                                                 true /* wait for commit */);
        switch (result)
        {
--- 1755,1761 ----
  
        result = heap_update(relation, otid, tup,
                                                 &ctid,
!                                                GetCurrentCommandId(), SnapshotAny,
                                                 true /* wait for commit */);
        switch (result)
        {
***************
*** 1767,1773 ****
        tuple->t_len = ItemIdGetLength(lp);
  
  l3:
!       result = HeapTupleSatisfiesUpdate(tuple, cid);
  
        if (result == HeapTupleInvisible)
        {
--- 1804,1810 ----
        tuple->t_len = ItemIdGetLength(lp);
  
  l3:
!       result = HeapTupleSatisfiesUpdate(tuple->t_data, cid);
  
        if (result == HeapTupleInvisible)
        {
*** src/backend/commands/async.c.orig   Mon Sep 15 19:33:39 2003
--- src/backend/commands/async.c        Wed Oct  1 15:36:02 2003
***************
*** 537,543 ****
                                 */
                                result = heap_update(lRel, &lTuple->t_self, rTuple,
                                                                         &ctid,
!                                                                        
GetCurrentCommandId(),
                                                                         false /* no 
wait for commit */);
                                switch (result)
                                {
--- 537,543 ----
                                 */
                                result = heap_update(lRel, &lTuple->t_self, rTuple,
                                                                         &ctid,
!                                                                        
GetCurrentCommandId(), SnapshotAny,
                                                                         false /* no 
wait for commit */);
                                switch (result)
                                {
*** src/backend/executor/execMain.c.orig        Thu Sep 25 14:58:35 2003
--- src/backend/executor/execMain.c     Wed Oct  1 17:22:30 2003
***************
*** 104,111 ****
   * field of the QueryDesc is filled in to describe the tuples that will be
   * returned, and the internal fields (estate and planstate) are set up.
   *
!  * If useSnapshotNow is true, run the query with SnapshotNow time qual rules
!  * instead of the normal use of QuerySnapshot.
   *
   * If explainOnly is true, we are not actually intending to run the plan,
   * only to set up for EXPLAIN; so skip unwanted side-effects.
--- 104,117 ----
   * field of the QueryDesc is filled in to describe the tuples that will be
   * returned, and the internal fields (estate and planstate) are set up.
   *
!  * If useCurrentSnapshot is true, run the query with the latest available
!  * snapshot, instead of the normal QuerySnapshot.  Also, if it's an update
!  * or delete query, check that the rows to be updated or deleted would be
!  * visible to the normal QuerySnapshot.  (This is a special-case behavior
!  * needed for referential integrity updates in serializable transactions.
!  * We must check all currently-committed rows, but we want to throw a
!  * can't-serialize error if any rows that would need updates would not be
!  * visible under the normal serializable snapshot.)
   *
   * If explainOnly is true, we are not actually intending to run the plan,
   * only to set up for EXPLAIN; so skip unwanted side-effects.
***************
*** 115,121 ****
   * ----------------------------------------------------------------
   */
  void
! ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly)
  {
        EState     *estate;
        MemoryContext oldcontext;
--- 121,127 ----
   * ----------------------------------------------------------------
   */
  void
! ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly)
  {
        EState     *estate;
        MemoryContext oldcontext;
***************
*** 157,171 ****
         * the life of this query, even if it outlives the current command and
         * current snapshot.
         */
!       if (useSnapshotNow)
        {
!               estate->es_snapshot = SnapshotNow;
!               estate->es_snapshot_cid = GetCurrentCommandId();
        }
        else
        {
                estate->es_snapshot = CopyQuerySnapshot();
!               estate->es_snapshot_cid = estate->es_snapshot->curcid;
        }
  
        /*
--- 163,180 ----
         * the life of this query, even if it outlives the current command and
         * current snapshot.
         */
!       if (useCurrentSnapshot)
        {
!               /* RI update/delete query --- must use an up-to-date snapshot */
!               estate->es_snapshot = CopyCurrentSnapshot();
!               /* crosscheck updates/deletes against transaction snapshot */
!               estate->es_crosscheck_snapshot = CopyQuerySnapshot();
        }
        else
        {
+               /* normal query --- use query snapshot, no crosscheck */
                estate->es_snapshot = CopyQuerySnapshot();
!               estate->es_crosscheck_snapshot = SnapshotAny;
        }
  
        /*
***************
*** 1118,1124 ****
  
                                        tuple.t_self = *((ItemPointer) 
DatumGetPointer(datum));
                                        test = heap_mark4update(erm->relation, &tuple, 
&buffer,
!                                                                                      
 estate->es_snapshot_cid);
                                        ReleaseBuffer(buffer);
                                        switch (test)
                                        {
--- 1127,1133 ----
  
                                        tuple.t_self = *((ItemPointer) 
DatumGetPointer(datum));
                                        test = heap_mark4update(erm->relation, &tuple, 
&buffer,
!                                                                                      
 estate->es_snapshot->curcid);
                                        ReleaseBuffer(buffer);
                                        switch (test)
                                        {
***************
*** 1278,1284 ****
        if (estate->es_into_relation_descriptor != NULL)
        {
                heap_insert(estate->es_into_relation_descriptor, tuple,
!                                       estate->es_snapshot_cid);
                IncrAppended();
        }
  
--- 1287,1293 ----
        if (estate->es_into_relation_descriptor != NULL)
        {
                heap_insert(estate->es_into_relation_descriptor, tuple,
!                                       estate->es_snapshot->curcid);
                IncrAppended();
        }
  
***************
*** 1354,1360 ****
         * insert the tuple
         */
        newId = heap_insert(resultRelationDesc, tuple,
!                                               estate->es_snapshot_cid);
  
        IncrAppended();
        (estate->es_processed)++;
--- 1363,1369 ----
         * insert the tuple
         */
        newId = heap_insert(resultRelationDesc, tuple,
!                                               estate->es_snapshot->curcid);
  
        IncrAppended();
        (estate->es_processed)++;
***************
*** 1406,1412 ****
                bool            dodelete;
  
                dodelete = ExecBRDeleteTriggers(estate, resultRelInfo, tupleid,
!                                                                               
estate->es_snapshot_cid);
  
                if (!dodelete)                  /* "do nothing" */
                        return;
--- 1415,1421 ----
                bool            dodelete;
  
                dodelete = ExecBRDeleteTriggers(estate, resultRelInfo, tupleid,
!                                                                               
estate->es_snapshot->curcid);
  
                if (!dodelete)                  /* "do nothing" */
                        return;
***************
*** 1418,1424 ****
  ldelete:;
        result = heap_delete(resultRelationDesc, tupleid,
                                                 &ctid,
!                                                estate->es_snapshot_cid,
                                                 true /* wait for commit */);
        switch (result)
        {
--- 1427,1434 ----
  ldelete:;
        result = heap_delete(resultRelationDesc, tupleid,
                                                 &ctid,
!                                                estate->es_snapshot->curcid,
!                                                estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */);
        switch (result)
        {
***************
*** 1517,1523 ****
  
                newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
                                                                                
tupleid, tuple,
!                                                                               
estate->es_snapshot_cid);
  
                if (newtuple == NULL)   /* "do nothing" */
                        return;
--- 1527,1533 ----
  
                newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
                                                                                
tupleid, tuple,
!                                                                               
estate->es_snapshot->curcid);
  
                if (newtuple == NULL)   /* "do nothing" */
                        return;
***************
*** 1553,1559 ****
         */
        result = heap_update(resultRelationDesc, tupleid, tuple,
                                                 &ctid,
!                                                estate->es_snapshot_cid,
                                                 true /* wait for commit */);
        switch (result)
        {
--- 1563,1570 ----
         */
        result = heap_update(resultRelationDesc, tupleid, tuple,
                                                 &ctid,
!                                                estate->es_snapshot->curcid,
!                                                estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */);
        switch (result)
        {
***************
*** 2039,2045 ****
         */
        epqstate->es_direction = ForwardScanDirection;
        epqstate->es_snapshot = estate->es_snapshot;
!       epqstate->es_snapshot_cid = estate->es_snapshot_cid;
        epqstate->es_range_table = estate->es_range_table;
        epqstate->es_result_relations = estate->es_result_relations;
        epqstate->es_num_result_relations = estate->es_num_result_relations;
--- 2050,2056 ----
         */
        epqstate->es_direction = ForwardScanDirection;
        epqstate->es_snapshot = estate->es_snapshot;
!       epqstate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;
        epqstate->es_range_table = estate->es_range_table;
        epqstate->es_result_relations = estate->es_result_relations;
        epqstate->es_num_result_relations = estate->es_num_result_relations;
*** src/backend/executor/execUtils.c.orig       Thu Sep 25 14:58:35 2003
--- src/backend/executor/execUtils.c    Wed Oct  1 15:35:55 2003
***************
*** 178,184 ****
         */
        estate->es_direction = ForwardScanDirection;
        estate->es_snapshot = SnapshotNow;
!       estate->es_snapshot_cid = FirstCommandId;
        estate->es_range_table = NIL;
  
        estate->es_result_relations = NULL;
--- 178,184 ----
         */
        estate->es_direction = ForwardScanDirection;
        estate->es_snapshot = SnapshotNow;
!       estate->es_crosscheck_snapshot = SnapshotAny; /* means no crosscheck */
        estate->es_range_table = NIL;
  
        estate->es_result_relations = NULL;
*** src/backend/executor/nodeSubplan.c.orig     Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubplan.c  Wed Oct  1 17:22:30 2003
***************
*** 709,715 ****
        sp_estate->es_tupleTable =
                ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
        sp_estate->es_snapshot = estate->es_snapshot;
!       sp_estate->es_snapshot_cid = estate->es_snapshot_cid;
        sp_estate->es_instrument = estate->es_instrument;
  
        /*
--- 709,715 ----
        sp_estate->es_tupleTable =
                ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
        sp_estate->es_snapshot = estate->es_snapshot;
!       sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;
        sp_estate->es_instrument = estate->es_instrument;
  
        /*
*** src/backend/executor/nodeSubqueryscan.c.orig        Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubqueryscan.c     Wed Oct  1 17:22:31 2003
***************
*** 177,183 ****
        sp_estate->es_tupleTable =
                ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
        sp_estate->es_snapshot = estate->es_snapshot;
!       sp_estate->es_snapshot_cid = estate->es_snapshot_cid;
        sp_estate->es_instrument = estate->es_instrument;
  
        /*
--- 177,183 ----
        sp_estate->es_tupleTable =
                ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
        sp_estate->es_snapshot = estate->es_snapshot;
!       sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot;
        sp_estate->es_instrument = estate->es_instrument;
  
        /*
*** src/backend/executor/spi.c.orig     Thu Sep 25 14:58:35 2003
--- src/backend/executor/spi.c  Wed Oct  1 16:33:44 2003
***************
*** 33,43 ****
  
  static int    _SPI_execute(const char *src, int tcount, _SPI_plan *plan);
  static int    _SPI_pquery(QueryDesc *queryDesc, bool runit,
!                                               bool useSnapshotNow, int tcount);
  
  static int _SPI_execute_plan(_SPI_plan *plan,
                                                         Datum *Values, const char 
*Nulls,
!                                                        bool useSnapshotNow, int 
tcount);
  
  static void _SPI_cursor_operation(Portal portal, bool forward, int count,
                                          DestReceiver *dest);
--- 33,43 ----
  
  static int    _SPI_execute(const char *src, int tcount, _SPI_plan *plan);
  static int    _SPI_pquery(QueryDesc *queryDesc, bool runit,
!                                               bool useCurrentSnapshot, int tcount);
  
  static int _SPI_execute_plan(_SPI_plan *plan,
                                                         Datum *Values, const char 
*Nulls,
!                                                        bool useCurrentSnapshot, int 
tcount);
  
  static void _SPI_cursor_operation(Portal portal, bool forward, int count,
                                          DestReceiver *dest);
***************
*** 245,256 ****
  }
  
  /*
!  * SPI_execp_now -- identical to SPI_execp, except that we use SnapshotNow
!  * instead of the normal QuerySnapshot.  This is currently not documented
!  * in spi.sgml because it is only intended for use by RI triggers.
   */
  int
! SPI_execp_now(void *plan, Datum *Values, const char *Nulls, int tcount)
  {
        int                     res;
  
--- 245,258 ----
  }
  
  /*
!  * SPI_execp_current -- identical to SPI_execp, except that we expose the
!  * Executor option to use a current snapshot instead of the normal
!  * QuerySnapshot.  This is currently not documented in spi.sgml because
!  * it is only intended for use by RI triggers.
   */
  int
! SPI_execp_current(void *plan, Datum *Values, const char *Nulls,
!                                 bool useCurrentSnapshot, int tcount)
  {
        int                     res;
  
***************
*** 264,270 ****
        if (res < 0)
                return res;
  
!       res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, true, tcount);
  
        _SPI_end_call(true);
        return res;
--- 266,273 ----
        if (res < 0)
                return res;
  
!       res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls,
!                                                       useCurrentSnapshot, tcount);
  
        _SPI_end_call(true);
        return res;
***************
*** 1124,1130 ****
  
  static int
  _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
!                                 bool useSnapshotNow, int tcount)
  {
        List       *query_list_list = plan->qtlist;
        List       *plan_list = plan->ptlist;
--- 1127,1133 ----
  
  static int
  _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
!                                 bool useCurrentSnapshot, int tcount)
  {
        List       *query_list_list = plan->qtlist;
        List       *plan_list = plan->ptlist;
***************
*** 1195,1201 ****
                        {
                                qdesc = CreateQueryDesc(queryTree, planTree, dest,
                                                                                
paramLI, false);
!                               res = _SPI_pquery(qdesc, true, useSnapshotNow,
                                                                  queryTree->canSetTag 
? tcount : 0);
                                if (res < 0)
                                        return res;
--- 1198,1204 ----
                        {
                                qdesc = CreateQueryDesc(queryTree, planTree, dest,
                                                                                
paramLI, false);
!                               res = _SPI_pquery(qdesc, true, useCurrentSnapshot,
                                                                  queryTree->canSetTag 
? tcount : 0);
                                if (res < 0)
                                        return res;
***************
*** 1208,1214 ****
  }
  
  static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit, bool useSnapshotNow, int tcount)
  {
        int                     operation = queryDesc->operation;
        int                     res;
--- 1211,1218 ----
  }
  
  static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit,
!                       bool useCurrentSnapshot, int tcount)
  {
        int                     operation = queryDesc->operation;
        int                     res;
***************
*** 1245,1251 ****
                ResetUsage();
  #endif
  
!       ExecutorStart(queryDesc, useSnapshotNow, false);
  
        ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount);
  
--- 1249,1255 ----
                ResetUsage();
  #endif
  
!       ExecutorStart(queryDesc, useCurrentSnapshot, false);
  
        ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount);
  
*** src/backend/storage/ipc/sinval.c.orig       Wed Sep 24 14:54:01 2003
--- src/backend/storage/ipc/sinval.c    Wed Oct  1 16:02:42 2003
***************
*** 330,339 ****
         * lastBackend would be sufficient.  But it seems better to do the
         * malloc while not holding the lock, so we can't look at lastBackend.
         *
!        * if (snapshot->xip != NULL) no need to free and reallocate xip;
!        *
!        * We can reuse the old xip array, because MaxBackends does not change at
!        * runtime.
         */
        if (snapshot->xip == NULL)
        {
--- 330,339 ----
         * lastBackend would be sufficient.  But it seems better to do the
         * malloc while not holding the lock, so we can't look at lastBackend.
         *
!        * This does open a possibility for avoiding repeated malloc/free:
!        * since MaxBackends does not change at runtime, we can simply reuse
!        * the previous xip array if any.  (This relies on the fact that all
!        * calls pass static SnapshotData structs.)
         */
        if (snapshot->xip == NULL)
        {
*** src/backend/utils/adt/ri_triggers.c.orig    Mon Sep 29 13:44:31 2003
--- src/backend/utils/adt/ri_triggers.c Wed Oct  1 16:49:40 2003
***************
*** 157,162 ****
--- 157,163 ----
  static bool ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
                                Relation fk_rel, Relation pk_rel,
                                HeapTuple old_tuple, HeapTuple new_tuple,
+                               bool detectNewRows,
                                int expect_OK, const char *constrname);
  static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
                                 Relation rel, HeapTuple tuple,
***************
*** 276,281 ****
--- 277,283 ----
                ri_PerformCheck(&qkey, qplan,
                                                fk_rel, pk_rel,
                                                NULL, NULL,
+                                               false,
                                                SPI_OK_SELECT,
                                                tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 433,438 ****
--- 435,441 ----
        ri_PerformCheck(&qkey, qplan,
                                        fk_rel, pk_rel,
                                        NULL, new_row,
+                                       false,
                                        SPI_OK_SELECT,
                                        tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 594,599 ****
--- 597,603 ----
        result = ri_PerformCheck(&qkey, qplan,
                                                         fk_rel, pk_rel,
                                                         old_row, NULL,
+                                                        true,          /* treat like 
update */
                                                         SPI_OK_SELECT, NULL);
  
        if (SPI_finish() != SPI_OK_FINISH)
***************
*** 752,757 ****
--- 756,762 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_SELECT,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 942,947 ****
--- 947,953 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_SELECT,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 1102,1107 ****
--- 1108,1114 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_DELETE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 1285,1290 ****
--- 1292,1298 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, new_row,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_UPDATE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 1453,1458 ****
--- 1461,1467 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_SELECT,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 1633,1638 ****
--- 1642,1648 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_SELECT,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 1802,1807 ****
--- 1812,1818 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_UPDATE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 2019,2024 ****
--- 2030,2036 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_UPDATE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 2188,2193 ****
--- 2200,2206 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_UPDATE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 2392,2397 ****
--- 2405,2411 ----
                        ri_PerformCheck(&qkey, qplan,
                                                        fk_rel, pk_rel,
                                                        old_row, NULL,
+                                                       true, /* must detect new rows 
*/
                                                        SPI_OK_UPDATE,
                                                        
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
  
***************
*** 2788,2798 ****
--- 2802,2814 ----
  ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
                                Relation fk_rel, Relation pk_rel,
                                HeapTuple old_tuple, HeapTuple new_tuple,
+                               bool detectNewRows,
                                int expect_OK, const char *constrname)
  {
        Relation        query_rel,
                                source_rel;
        int                     key_idx;
+       bool            useCurrentSnapshot;
        int                     limit;
        int                     spi_result;
        AclId           save_uid;
***************
*** 2842,2850 ****
                                                 vals, nulls);
        }
  
!       /* Switch to proper UID to perform check as */
!       save_uid = GetUserId();
!       SetUserId(RelationGetForm(query_rel)->relowner);
  
        /*
         * If this is a select query (e.g., for a 'no action' or 'restrict'
--- 2858,2882 ----
                                                 vals, nulls);
        }
  
!       /*
!        * In READ COMMITTED mode, we just need to make sure the regular query
!        * snapshot is up-to-date, and we will see all rows that could be
!        * interesting.  In SERIALIZABLE mode, we can't update the regular query
!        * snapshot.  If the caller passes detectNewRows == false then it's okay
!        * to do the query with the transaction snapshot; otherwise we tell the
!        * executor to force a current snapshot (and error out if it finds any
!        * rows under current snapshot that wouldn't be visible per the
!        * transaction snapshot).
!        */
!       if (XactIsoLevel == XACT_SERIALIZABLE)
!       {
!               useCurrentSnapshot = detectNewRows;
!       }
!       else
!       {
!               SetQuerySnapshot();
!               useCurrentSnapshot = false;
!       }
  
        /*
         * If this is a select query (e.g., for a 'no action' or 'restrict'
***************
*** 2854,2872 ****
         */
        limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
  
!       /*
!        * Run the plan, using SnapshotNow time qual rules so that we can see
!        * all committed tuples, even those committed after our own transaction
!        * or query started.
!        */
!       spi_result = SPI_execp_now(qplan, vals, nulls, limit);
  
        /* Restore UID */
        SetUserId(save_uid);
  
        /* Check result */
        if (spi_result < 0)
!               elog(ERROR, "SPI_execp_now returned %d", spi_result);
  
        if (expect_OK >= 0 && spi_result != expect_OK)
                ri_ReportViolation(qkey, constrname ? constrname : "",
--- 2886,2905 ----
         */
        limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
  
!       /* Switch to proper UID to perform check as */
!       save_uid = GetUserId();
!       SetUserId(RelationGetForm(query_rel)->relowner);
! 
!       /* Finally we can run the query. */
!       spi_result = SPI_execp_current(qplan, vals, nulls,
!                                                                  useCurrentSnapshot, 
limit);
  
        /* Restore UID */
        SetUserId(save_uid);
  
        /* Check result */
        if (spi_result < 0)
!               elog(ERROR, "SPI_execp_current returned %d", spi_result);
  
        if (expect_OK >= 0 && spi_result != expect_OK)
                ri_ReportViolation(qkey, constrname ? constrname : "",
*** src/backend/utils/time/tqual.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/utils/time/tqual.c      Wed Oct  1 16:02:52 2003
***************
*** 26,39 ****
  #include "storage/sinval.h"
  #include "utils/tqual.h"
  
! 
! static SnapshotData SnapshotDirtyData;
! Snapshot      SnapshotDirty = &SnapshotDirtyData;
! 
  static SnapshotData QuerySnapshotData;
  static SnapshotData SerializableSnapshotData;
  Snapshot      QuerySnapshot = NULL;
  Snapshot      SerializableSnapshot = NULL;
  
  /* These are updated by GetSnapshotData: */
  TransactionId RecentXmin = InvalidTransactionId;
--- 26,44 ----
  #include "storage/sinval.h"
  #include "utils/tqual.h"
  
! /*
!  * The SnapshotData structs are static to simplify memory allocation
!  * (see the hack in GetSnapshotData to avoid repeated malloc/free).
!  */
  static SnapshotData QuerySnapshotData;
  static SnapshotData SerializableSnapshotData;
+ static SnapshotData CurrentSnapshotData;
+ static SnapshotData SnapshotDirtyData;
+ 
+ /* Externally visible pointers to valid snapshots: */
  Snapshot      QuerySnapshot = NULL;
  Snapshot      SerializableSnapshot = NULL;
+ Snapshot      SnapshotDirty = &SnapshotDirtyData;
  
  /* These are updated by GetSnapshotData: */
  TransactionId RecentXmin = InvalidTransactionId;
***************
*** 387,396 ****
   *    CurrentCommandId.
   */
  int
! HeapTupleSatisfiesUpdate(HeapTuple htuple, CommandId curcid)
  {
-       HeapTupleHeader tuple = htuple->t_data;
- 
        if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
        {
                if (tuple->t_infomask & HEAP_XMIN_INVALID)
--- 392,399 ----
   *    CurrentCommandId.
   */
  int
! HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid)
  {
        if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
        {
                if (tuple->t_infomask & HEAP_XMIN_INVALID)
***************
*** 1024,1029 ****
--- 1027,1068 ----
  }
  
  /*
+  * CopyCurrentSnapshot
+  *            Make a snapshot that is up-to-date as of the current instant,
+  *            and return a copy.
+  *
+  * The copy is palloc'd in the current memory context.
+  */
+ Snapshot
+ CopyCurrentSnapshot(void)
+ {
+       Snapshot        currentSnapshot;
+       Snapshot        snapshot;
+ 
+       if (QuerySnapshot == NULL)      /* should not be first call in xact */
+               elog(ERROR, "no snapshot has been set");
+ 
+       /* Update the static struct */
+       currentSnapshot = GetSnapshotData(&CurrentSnapshotData, false);
+       currentSnapshot->curcid = GetCurrentCommandId();
+ 
+       /* Make a copy */
+       snapshot = (Snapshot) palloc(sizeof(SnapshotData));
+       memcpy(snapshot, currentSnapshot, sizeof(SnapshotData));
+       if (snapshot->xcnt > 0)
+       {
+               snapshot->xip = (TransactionId *)
+                       palloc(snapshot->xcnt * sizeof(TransactionId));
+               memcpy(snapshot->xip, currentSnapshot->xip,
+                          snapshot->xcnt * sizeof(TransactionId));
+       }
+       else
+               snapshot->xip = NULL;
+ 
+       return snapshot;
+ }
+ 
+ /*
   * FreeXactSnapshot
   *            Free snapshot(s) at end of transaction.
   */
***************
*** 1031,1038 ****
  FreeXactSnapshot(void)
  {
        /*
!        * We do not free(QuerySnapshot->xip); or
!        * free(SerializableSnapshot->xip); they will be reused soon
         */
        QuerySnapshot = NULL;
        SerializableSnapshot = NULL;
--- 1070,1078 ----
  FreeXactSnapshot(void)
  {
        /*
!        * We do not free the xip arrays for the snapshot structs;
!        * they will be reused soon.  So this is now just a state
!        * change to prevent outside callers from accessing the snapshots.
         */
        QuerySnapshot = NULL;
        SerializableSnapshot = NULL;
*** src/include/access/heapam.h.orig    Mon Sep 15 19:33:43 2003
--- src/include/access/heapam.h Wed Oct  1 15:35:11 2003
***************
*** 155,163 ****
  
  extern Oid    heap_insert(Relation relation, HeapTuple tup, CommandId cid);
  extern int heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid,
!                       CommandId cid, bool wait);
  extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
!                       ItemPointer ctid, CommandId cid, bool wait);
  extern int heap_mark4update(Relation relation, HeapTuple tup,
                                 Buffer *userbuf, CommandId cid);
  
--- 155,163 ----
  
  extern Oid    heap_insert(Relation relation, HeapTuple tup, CommandId cid);
  extern int heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid,
!                       CommandId cid, Snapshot crosscheck, bool wait);
  extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
!                       ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool 
wait);
  extern int heap_mark4update(Relation relation, HeapTuple tup,
                                 Buffer *userbuf, CommandId cid);
  
*** src/include/executor/executor.h.orig        Thu Sep 25 14:58:35 2003
--- src/include/executor/executor.h     Wed Oct  1 15:35:05 2003
***************
*** 85,91 ****
  /*
   * prototypes from functions in execMain.c
   */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow,
                                                  bool explainOnly);
  extern TupleTableSlot *ExecutorRun(QueryDesc *queryDesc,
                        ScanDirection direction, long count);
--- 85,91 ----
  /*
   * prototypes from functions in execMain.c
   */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot,
                                                  bool explainOnly);
  extern TupleTableSlot *ExecutorRun(QueryDesc *queryDesc,
                        ScanDirection direction, long count);
*** src/include/executor/spi.h.orig     Thu Sep 25 14:58:36 2003
--- src/include/executor/spi.h  Wed Oct  1 16:33:39 2003
***************
*** 84,91 ****
  extern int    SPI_exec(const char *src, int tcount);
  extern int SPI_execp(void *plan, Datum *values, const char *Nulls,
                  int tcount);
! extern int SPI_execp_now(void *plan, Datum *values, const char *Nulls,
!                 int tcount);
  extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern void *SPI_saveplan(void *plan);
  extern int    SPI_freeplan(void *plan);
--- 84,91 ----
  extern int    SPI_exec(const char *src, int tcount);
  extern int SPI_execp(void *plan, Datum *values, const char *Nulls,
                  int tcount);
! extern int SPI_execp_current(void *plan, Datum *values, const char *Nulls,
!                                                        bool useCurrentSnapshot, int 
tcount);
  extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes);
  extern void *SPI_saveplan(void *plan);
  extern int    SPI_freeplan(void *plan);
*** src/include/nodes/execnodes.h.orig  Thu Sep 25 14:58:36 2003
--- src/include/nodes/execnodes.h       Wed Oct  1 15:35:00 2003
***************
*** 286,292 ****
        /* Basic state for all query types: */
        ScanDirection es_direction; /* current scan direction */
        Snapshot        es_snapshot;    /* time qual to use */
!       CommandId       es_snapshot_cid;        /* CommandId component of time qual */
        List       *es_range_table; /* List of RangeTableEntrys */
  
        /* Info about target table for insert/update/delete queries: */
--- 286,292 ----
        /* Basic state for all query types: */
        ScanDirection es_direction; /* current scan direction */
        Snapshot        es_snapshot;    /* time qual to use */
!       Snapshot        es_crosscheck_snapshot; /* crosscheck time qual for RI */
        List       *es_range_table; /* List of RangeTableEntrys */
  
        /* Info about target table for insert/update/delete queries: */
*** src/include/utils/tqual.h.orig      Thu Sep 25 14:58:36 2003
--- src/include/utils/tqual.h   Wed Oct  1 16:02:21 2003
***************
*** 100,106 ****
  extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple);
  extern bool HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple,
                                                   Snapshot snapshot);
! extern int HeapTupleSatisfiesUpdate(HeapTuple tuple,
                                                 CommandId curcid);
  extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTupleHeader tuple,
                                                 TransactionId OldestXmin);
--- 100,106 ----
  extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple);
  extern bool HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple,
                                                   Snapshot snapshot);
! extern int HeapTupleSatisfiesUpdate(HeapTupleHeader tuple,
                                                 CommandId curcid);
  extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTupleHeader tuple,
                                                 TransactionId OldestXmin);
***************
*** 108,113 ****
--- 108,114 ----
  extern Snapshot GetSnapshotData(Snapshot snapshot, bool serializable);
  extern void SetQuerySnapshot(void);
  extern Snapshot CopyQuerySnapshot(void);
+ extern Snapshot CopyCurrentSnapshot(void);
  extern void FreeXactSnapshot(void);
  
  #endif   /* TQUAL_H */

---------------------------(end of broadcast)---------------------------
TIP 1: subscribe and unsubscribe commands go to [EMAIL PROTECTED]

Reply via email to