Intro
-----

Following patch exports 8 byte txid and snapshot to user level
allowing its use in regular SQL.  It is based on Slony-I xxid
module.  It provides special 'snapshot' type for snapshot but
uses regular int8 for transaction ID's.

Exported API
------------

Type: snapshot

Functions:

  current_txid()                        returns int8
  current_snapshot()                    returns snapshot
  snapshot_xmin(snapshot)               returns int8
  snapshot_xmax(snapshot)               returns int8
  snapshot_active_list(snapshot)        returns setof int8
  snapshot_contains(snapshot, int8)     returns bool
  pg_sync_txid(int8)                    returns int8

Operation
---------

Extension to 8-byte is done by keeping track of wraparound count
in pg_control.  On every checkpoint, nextxid is compared to one
stored in pg_control.  If value is smaller wraparound happened
and epoch is inreased.

When long txid or snapshot is requested, pg_control is locked with
LW_SHARED for retrieving epoch value from it.  The patch does not
affect core functionality in any other way.

Backup/restore of txid data
---------------------------

Currently I made pg_dumpall output following statement:

  "SELECT pg_sync_txid(%d)", current_txid()

then on target database, pg_sync_txid if it's current
(epoch + GetTopTransactionId()) are larger than given argument.
If not then it bumps epoch, until they are, thus guaranteeing that
new issued txid's are larger then in source database.  If restored
into same database instance, nothing will happen.


Advantages of 8-byte txids
--------------------------

* Indexes won't break silently.  No need for mandatory periodic
  truncate which may not happen for various reasons.
* Allows to keep values from different databases in one table/index.
* Ability to bring data into different server and continue there.

Advantages in being in core
---------------------------

* Core code can guarantee that wraparound check happens in 2G transactions.
* Core code can update pg_control non-transactionally.  Module
  needs to operate inside user transaction when updating epoch
  row, which bring various problems (READ COMMITTED vs. SERIALIZABLE,
  long transactions, locking, etc).
* Core code has only one place where it needs to update, module
  needs to have epoch table in each database.

Todo, tothink
-------------

* Flesh out the documentation.  Probably needs some background.
* Better names for some functions?
* pg_sync_txid allows use of pg_dump for moveing database,
  but also adds possibility to shoot in the foot by allowing
  epoch wraparound to happen.  Is "Don't do it then" enough?
* Currently txid keeps its own copy of nextxid in pg_control,
  this makes clear data dependencies.  Its possible to drop it
  and use ->checkPointCopy->nextXid directly, thus saving 4 bytes.
* Should the pg_sync_txid() issued by pg_dump instead pg_dumpall?

-- 
marko

Index: pgsql/doc/src/sgml/func.sgml
===================================================================
*** pgsql.orig/doc/src/sgml/func.sgml
--- pgsql/doc/src/sgml/func.sgml
*************** SELECT pg_type_is_visible('myschema.widg
*** 9699,9704 ****
--- 9699,9805 ----
      databases within each cluster and their descriptions are stored globally
      as well.
     </para>
+ 
+    <para>
+      <xref linkend="functions-info-txid"> shows functions
+      querying transaction IDs and snapshots.
+    </para>
+ 
+    <table id="functions-info-txid">
+     <title>Transaction state functions</title>
+     <tgroup cols="3">
+      <thead>
+       <row><entry>Name</entry> <entry>Return Type</entry> 
<entry>Description</entry></row>
+      </thead>
+ 
+      <tbody>
+       <row>
+        <entry><literal><function>current_txid</function>()</literal></entry>
+        <entry><type>int8</type></entry>
+        <entry>transaction ID of current transaction</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>current_snapshot</function>()</literal></entry>
+        <entry><type>snapshot</type></entry>
+        <entry>snapshot for current transaction (or statement)</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>snapshot_xmin</function>(<type>snapshot</type>)</literal></entry>
+        <entry><type>int8</type></entry>
+        <entry>minimal txid in snapshot - all transactions below are visible 
in snapshot</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>snapshot_xmax</function>(<type>snapshot</type>)</literal></entry>
+        <entry><type>int8</type></entry>
+        <entry>maximal txid in snapshot - all transactions above are not 
visible in snapshot</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>snapshot_active_list</function>(<type>snapshot</type>)</literal></entry>
+        <entry><type>setof int8</type></entry>
+        <entry>list of active txid-s in snapshot</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>snapshot_contains</function>(<type>snapshot</type>, 
<type>int8</type>)</literal></entry>
+        <entry><type>bool</type></entry>
+        <entry>checks if txid is visible in snapshot</entry>
+       </row>
+ 
+       <row>
+        
<entry><literal><function>pg_sync_txid</function>(<type>int8</type>)</literal></entry>
+        <entry><type>int8</type></entry>
+        <entry>makes sure the following txids will be greater than given 
argument</entry>
+       </row>
+ 
+      </tbody>
+     </tgroup>
+    </table>
+ 
+    <indexterm zone="functions-info">
+     <primary>txid</primary>
+     <secondary>current</secondary>
+    </indexterm>
+ 
+    <indexterm zone="functions-info">
+     <primary>snapshot</primary>
+     <secondary>current</secondary>
+    </indexterm>
+ 
+    <indexterm zone="functions-info">
+     <primary>current_txid</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>current_snapshot</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>snapshot_xmin</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>snapshot_xmax</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>snapshot_contains</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>snapshot_uncommitted</primary>
+    </indexterm>
+    <indexterm zone="functions-info">
+     <primary>pg_sync_txid</primary>
+    </indexterm>
+ 
+    <para>
+     A snapshot is basically a list of running transactions.
+    </para>
+    <note>
+      <para>
+        TODO: describe usage.
+      </para>
+    </note>
+ 
    </sect1>
  
   <sect1 id="functions-admin">
Index: pgsql/src/include/access/xlogutils.h
===================================================================
*** pgsql.orig/src/include/access/xlogutils.h
--- pgsql/src/include/access/xlogutils.h
***************
*** 13,18 ****
--- 13,19 ----
  
  #include "storage/buf.h"
  #include "utils/rel.h"
+ #include "catalog/pg_control.h"
  
  
  extern void XLogInitRelationCache(void);
*************** extern void XLogTruncateRelation(RelFile
*** 26,29 ****
--- 27,33 ----
  
  extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);
  
+ extern void GetTxidEpoch(TxidEpoch *dst);
+ extern void SyncTxidEpoch(uint64 sync_txid);
+ 
  #endif
Index: pgsql/src/include/catalog/pg_control.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_control.h
--- pgsql/src/include/catalog/pg_control.h
*************** typedef enum DBState
*** 62,67 ****
--- 62,75 ----
  #define LOCALE_NAME_BUFLEN    128
  
  /*
+  * Long txid in expanded form.
+  */
+ typedef struct {
+       uint32                  epoch;          /* epoch value */
+       TransactionId   epoch_xid;      /* corresponding xid */
+ } TxidEpoch;
+ 
+ /*
   * Contents of pg_control.
   *
   * NOTE: try to keep this under 512 bytes so that it will fit on one physical
*************** typedef struct ControlFileData
*** 143,148 ****
--- 151,159 ----
        char            lc_collate[LOCALE_NAME_BUFLEN];
        char            lc_ctype[LOCALE_NAME_BUFLEN];
  
+       /* external txid tracking */
+       TxidEpoch       txid_epoch;
+ 
        /* CRC of all above ... MUST BE LAST! */
        pg_crc32        crc;
  } ControlFileData;
Index: pgsql/src/include/utils/builtins.h
===================================================================
*** pgsql.orig/src/include/utils/builtins.h
--- pgsql/src/include/utils/builtins.h
*************** extern Datum pg_prepared_statement(PG_FU
*** 886,889 ****
--- 886,904 ----
  /* utils/mmgr/portalmem.c */
  extern Datum pg_cursor(PG_FUNCTION_ARGS);
  
+ /* utils/adt/txid.c */
+ extern Datum txid_current_txid(PG_FUNCTION_ARGS);
+ extern Datum txid_current_snapshot(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_in(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_out(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_send(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_contains(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_xmax(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_active_list(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_from_text(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_to_text(PG_FUNCTION_ARGS);
+ extern Datum pg_sync_txid(PG_FUNCTION_ARGS);
+ 
  #endif   /* BUILTINS_H */
Index: pgsql/src/backend/access/transam/xlog.c
===================================================================
*** pgsql.orig/src/backend/access/transam/xlog.c
--- pgsql/src/backend/access/transam/xlog.c
*************** static void xlog_outrec(StringInfo buf, 
*** 501,506 ****
--- 501,507 ----
  static bool read_backup_label(XLogRecPtr *checkPointLoc);
  static void remove_backup_label(void);
  static void rm_redo_error_callback(void *arg);
+ static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid);
  
  
  /*
*************** CreateCheckPoint(bool shutdown, bool for
*** 5378,5383 ****
--- 5379,5387 ----
        ControlFile->checkPoint = ProcLastRecPtr;
        ControlFile->checkPointCopy = checkPoint;
        ControlFile->time = time(NULL);
+ 
+       update_txid_epoch(&ControlFile->txid_epoch, checkPoint.nextXid);
+ 
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
  
*************** rm_redo_error_callback(void *arg)
*** 6143,6145 ****
--- 6147,6230 ----
  
        pfree(buf.data);
  }
+ 
+ /*
+  * As MAX_INT64 is used for InvalidTransactionId,
+  * dont allow 0x7FFFFFFF as epoch value, so that no valid
+  * transaction can get its value.
+  */
+ #define MAX_EPOCH  0x7FFFFFFE
+ 
+ /*
+  * Per-checkpoint refresh of pg_control values.
+  *
+  * Increases epoch if wraparound happened.
+  */
+ static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid)
+ {
+       if (cur_xid < state->epoch_xid)
+       {
+               state->epoch++;
+               if (state->epoch > MAX_EPOCH)
+               {
+                       elog(WARNING, "txid epoch wraparound");
+                       state->epoch = 0;
+               }
+       }
+       state->epoch_xid = cur_xid;
+ }
+ 
+ /*
+  * Return current txid_epoch.
+  */
+ void GetTxidEpoch(TxidEpoch *dst)
+ {
+       LWLockAcquire(ControlFileLock, LW_SHARED);
+       *dst = ControlFile->txid_epoch;
+       LWLockRelease(ControlFileLock);
+ }
+ 
+ /*
+  * Checks if new issued txid's are greater then txid_sync.
+  * Inreases epoch if needed.
+  */
+ void SyncTxidEpoch(uint64 txid_sync)
+ {
+       TransactionId cur_xid = GetCurrentTransactionId();
+       TransactionId sync_xid = (TransactionId)txid_sync;
+       uint32 sync_epoch = txid_sync >> 32;
+       int change = 0;
+       TxidEpoch *state;
+ 
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+ 
+       state = &ControlFile->txid_epoch;
+ 
+       if (state->epoch < sync_epoch)
+       {
+               state->epoch = sync_epoch;
+               change = 1;
+       }
+       if (state->epoch == sync_epoch)
+       {
+               if (cur_xid <= sync_xid)
+               {
+                       state->epoch++;
+                       change = 1;
+               }
+       }
+ 
+       if (change)
+       {
+               if (state->epoch > MAX_EPOCH)
+               {
+                       elog(WARNING, "txid epoch wraparound");
+                       state->epoch = 0;
+               }
+               state->epoch_xid = cur_xid;
+               UpdateControlFile();
+       }
+ 
+       LWLockRelease(ControlFileLock);
+ }
+ 
Index: pgsql/src/backend/utils/adt/txid.c
===================================================================
*** /dev/null
--- pgsql/src/backend/utils/adt/txid.c
***************
*** 0 ****
--- 1,507 ----
+ /*-------------------------------------------------------------------------
+  * txid.c
+  *
+  *    Safe handling of transaction ID's.
+  *
+  *    Copyright (c) 2003-2004, PostgreSQL Global Development Group
+  *    Author: Jan Wieck, Afilias USA INC.
+  *
+  *    Extend to 8-byte: Marko Kreen, Skype Technologies
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include "postgres.h"
+ 
+ #include <limits.h>
+ 
+ #include "access/xact.h"
+ #include "access/transam.h"
+ #include "executor/spi.h"
+ #include "libpq/pqformat.h"
+ #include "miscadmin.h"
+ #include "utils/array.h"
+ #include "utils/lsyscache.h"
+ #include "catalog/catversion.h"
+ #include "catalog/pg_control.h"
+ #include "funcapi.h"
+ 
+ #ifdef INT64_IS_BUSTED
+ #error txid needs working int64
+ #endif
+ 
+ #define MAX_INT64  0x7FFFFFFFFFFFFFFFLL
+ 
+ /* Use unsigned variant internally */
+ typedef uint64 txid;
+ 
+ /*
+  * In-memory representation of snapshot.
+  */
+ typedef struct
+ {
+       int32           varsz;
+       uint32          nxip;
+       txid            xmin;
+       txid            xmax;
+       txid            xip[1];
+ }     txid_snapshot;
+ 
+ /*
+  * txid_snapshot_active_list needs to remember
+  * state between function calls
+  */
+ struct snap_state {
+       int pos;
+       txid_snapshot *snap;
+ };
+ 
+ /*
+  * do a TransactionId -> txid conversion
+  */
+ static txid convert_xid(TransactionId xid, const TxidEpoch *state)
+ {
+       uint64 epoch;
+ 
+       /* avoid issues with the the special meaning of 0 */
+       if (xid == InvalidTransactionId)
+               return MAX_INT64;
+ 
+       /* return special xid's as-is */
+       if (xid < FirstNormalTransactionId)
+               return xid;
+ 
+       /* xid can on both sides on wrap-around */
+       epoch = state->epoch;
+       if (TransactionIdPrecedes(xid, state->epoch_xid)) {
+               if (xid > state->epoch_xid)
+                       epoch--;
+       } else if (TransactionIdFollows(xid, state->epoch_xid)) {
+               if (xid < state->epoch_xid)
+                       epoch++;
+       }
+       return (epoch << 32) | xid;
+ }
+ 
+ static int _cmp_txid(const void *aa, const void *bb)
+ {
+       const uint64 *a = aa;
+       const uint64 *b = bb;
+       if (*a < *b)
+               return -1;
+       if (*a > *b)
+               return 1;
+       return 0;
+ }
+ 
+ static void sort_snapshot(txid_snapshot *snap)
+ {
+       qsort(snap->xip, snap->nxip, sizeof(txid), _cmp_txid);
+ }
+ 
+ /*
+  * Convert a cstring to txid_snapshot
+  */
+ static txid_snapshot *
+ parse_snapshot(const char *str)
+ {
+       int     a_size;
+       txid *xip;
+ 
+       int                     a_used = 0;
+       txid            xmin;
+       txid            xmax;
+       txid            last_val = 0, val;
+       txid_snapshot *snap;
+       int                     size;
+ 
+       char       *endp;
+ 
+       a_size = 1024;
+       xip = (txid *) palloc(sizeof(txid) * a_size);
+ 
+       xmin = (txid) strtoull(str, &endp, 0);
+       if (*endp != ':')
+               elog(ERROR, "illegal txid_snapshot input format");
+       str = endp + 1;
+ 
+       xmax = (txid) strtoull(str, &endp, 0);
+       if (*endp != ':')
+               elog(ERROR, "illegal txid_snapshot input format");
+       str = endp + 1;
+ 
+       /* it should look sane */
+       if (xmin >= xmax || xmin > MAX_INT64 || xmax > MAX_INT64
+                       || xmin == 0 || xmax == 0)
+               elog(ERROR, "illegal txid_snapshot input format");
+ 
+       while (*str != '\0')
+       {
+               if (a_used >= a_size)
+               {
+                       a_size *= 2;
+                       xip = (txid *) repalloc(xip, sizeof(txid) * a_size);
+               }
+ 
+               /* read next value */
+               if (*str == '\'')
+               {
+                       str++;
+                       val = (txid) strtoull(str, &endp, 0);
+                       if (*endp != '\'')
+                               elog(ERROR, "illegal txid_snapshot input 
format");
+                       str = endp + 1;
+               }
+               else
+               {
+                       val = (txid) strtoull(str, &endp, 0);
+                       str = endp;
+               }
+ 
+               /* require the input to be in order */
+               if (val < xmin || val <= last_val || val >= xmax)
+                       elog(ERROR, "illegal txid_snapshot input format");
+ 
+               xip[a_used++] = val;
+               last_val = val;
+ 
+               if (*str == ',')
+                       str++;
+               else
+               {
+                       if (*str != '\0')
+                               elog(ERROR, "illegal txid_snapshot input 
format");
+               }
+       }
+ 
+       size = offsetof(txid_snapshot, xip) + sizeof(txid) * a_used;
+       snap = (txid_snapshot *) palloc(size);
+       snap->varsz = size;
+       snap->xmin = xmin;
+       snap->xmax = xmax;
+       snap->nxip = a_used;
+       if (a_used > 0)
+               memcpy(&(snap->xip[0]), xip, sizeof(txid) * a_used);
+       pfree(xip);
+ 
+       return snap;
+ }
+ 
+ static txid
+ get_current_txid()
+ {
+       TxidEpoch state;
+       GetTxidEpoch(&state);
+       return convert_xid(GetTopTransactionId(), &state);
+ }
+ 
+ /*
+  * Return the current transaction ID as txid
+  */
+ Datum
+ txid_current_txid(PG_FUNCTION_ARGS)
+ {
+       PG_RETURN_INT64(get_current_txid());
+ }
+ 
+ /*
+  * Return current snapshot
+  */
+ Datum
+ txid_current_snapshot(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap;
+       unsigned num, i, size;
+       TxidEpoch state;
+ 
+       if (SerializableSnapshot == NULL)
+               elog(ERROR, "current_snapshot: SerializableSnapshot == NULL");
+ 
+       GetTxidEpoch(&state);
+ 
+       num = SerializableSnapshot->xcnt;
+       size = offsetof(txid_snapshot, xip) + sizeof(txid) * num;
+       snap = palloc(size);
+       snap->varsz = size;
+       snap->xmin = convert_xid(SerializableSnapshot->xmin, &state);
+       snap->xmax = convert_xid(SerializableSnapshot->xmax, &state);
+       snap->nxip = num;
+       for (i = 0; i < num; i++)
+               snap->xip[i] = convert_xid(SerializableSnapshot->xip[i], 
&state);
+ 
+       /* we want guaranteed ascending order */
+       sort_snapshot(snap);
+ 
+       PG_RETURN_POINTER(snap);
+ }
+ 
+ /*
+  * Return snapshot's xmin
+  */
+ Datum
+ txid_snapshot_xmin(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+       int64           res = snap->xmin;
+ 
+       PG_FREE_IF_COPY(snap, 0);
+       PG_RETURN_INT64(res);
+ }
+ 
+ /*
+  * Return snapshot's xmax
+  */
+ Datum
+ txid_snapshot_xmax(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+       int64           res = snap->xmax;
+ 
+       PG_FREE_IF_COPY(snap, 0);
+       PG_RETURN_INT64(res);
+ }
+ 
+ /*
+  * returns uncommitted TXID's in snapshot.
+  */
+ Datum
+ txid_snapshot_active_list(PG_FUNCTION_ARGS)
+ {
+       FuncCallContext *fctx;
+       struct snap_state *state;
+ 
+       if (SRF_IS_FIRSTCALL()) {
+               txid_snapshot *snap;
+               int statelen;
+ 
+               snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ 
+               fctx = SRF_FIRSTCALL_INIT();
+               statelen = sizeof(*state) + snap->varsz;
+               state = MemoryContextAlloc(fctx->multi_call_memory_ctx, 
statelen);
+               state->pos = 0;
+               state->snap = (txid_snapshot *)((char *)state + sizeof(*state));
+               memcpy(state->snap, snap, snap->varsz);
+               fctx->user_fctx = state;
+               PG_FREE_IF_COPY(snap, 0);
+       }
+       fctx = SRF_PERCALL_SETUP();
+       state = fctx->user_fctx;
+       if (state->pos < state->snap->nxip) {
+               Datum res = Int64GetDatum(state->snap->xip[state->pos]);
+               state->pos++;
+               SRF_RETURN_NEXT(fctx, res);
+       } else {
+               SRF_RETURN_DONE(fctx);
+       }
+ }
+ 
+ /*
+  * input function for type txid_snapshot
+  */
+ Datum
+ txid_snapshot_in(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap;
+       char       *str = PG_GETARG_CSTRING(0);
+ 
+       snap = parse_snapshot(str);
+       PG_RETURN_POINTER(snap);
+ }
+ 
+ /*
+  * output function for type txid_snapshot
+  */
+ Datum
+ txid_snapshot_out(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ 
+       char       *str = palloc(60 + snap->nxip * 30);
+       char       *cp = str;
+       int                     i;
+ 
+       snprintf(str, 60, "%llu:%llu:",
+                       (unsigned long long)snap->xmin,
+                       (unsigned long long)snap->xmax);
+       cp = str + strlen(str);
+ 
+       for (i = 0; i < snap->nxip; i++)
+       {
+               snprintf(cp, 30, "%llu%s",
+                               (unsigned long long)snap->xip[i],
+                                (i < snap->nxip - 1) ? "," : "");
+               cp += strlen(cp);
+       }
+ 
+       PG_FREE_IF_COPY(snap, 0);
+ 
+       PG_RETURN_CSTRING(str);
+ }
+ 
+ /*
+  * convert text to txid_snapshot
+  */
+ Datum
+ txid_snapshot_from_text(PG_FUNCTION_ARGS)
+ {
+       text            *txt = PG_GETARG_TEXT_P(0);
+       txid_snapshot *snap;
+       char       *str;
+       int                     len;
+ 
+       len = VARSIZE(txt) - VARHDRSZ;
+       str = palloc(len + 1);
+       memcpy(str, VARDATA(txt), len);
+       str[len] = 0;
+ 
+       snap = parse_snapshot(str);
+ 
+       pfree(str);
+       PG_FREE_IF_COPY(txt, 0);
+ 
+       PG_RETURN_POINTER(snap);
+ }
+ 
+ /*
+  * convert txid_snapshot to text
+  */
+ Datum
+ txid_snapshot_to_text(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ 
+       text       *res = palloc(VARHDRSZ + 60 + snap->nxip * 30);
+       char       *str = VARDATA(res);
+       char       *cp = str;
+       int                     i;
+ 
+       snprintf(str, 60, "%llu:%llu:",
+                       (unsigned long long)snap->xmin,
+                       (unsigned long long)snap->xmax);
+       cp = str + strlen(str);
+ 
+       for (i = 0; i < snap->nxip; i++)
+       {
+               snprintf(cp, 30, "%llu%s",
+                               (unsigned long long)snap->xip[i],
+                                (i < snap->nxip - 1) ? "," : "");
+               cp += strlen(cp);
+       }
+ 
+       VARATT_SIZEP(res) = VARHDRSZ + cp - str;
+ 
+       PG_FREE_IF_COPY(snap, 0);
+ 
+       PG_RETURN_TEXT_P(res);
+ }
+ 
+ /*
+  * read binary representation
+  */
+ Datum
+ txid_snapshot_recv(PG_FUNCTION_ARGS)
+ {
+       StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+       txid_snapshot *snap;
+       unsigned i, count, size;
+       txid val;
+ 
+       count = pq_getmsgint(buf, 4);
+       size = offsetof(txid_snapshot, xip) + sizeof(txid) * count;
+       snap = palloc(size);
+ 
+       snap->varsz = size;
+       snap->nxip = count;
+       snap->xmin = pq_getmsgint64(buf);
+       snap->xmax = pq_getmsgint64(buf);
+       val = snap->xmin;
+       for (i = 0; i < count; i++) {
+               unsigned delta;
+               delta = pq_getmsgint(buf, 2);
+               if (delta & 0x8000)
+                       val += ((delta & 0x7FFF) << 16) + pq_getmsgint(buf, 2);
+               else
+                       val += delta;
+               if (val < snap->xmin || val > snap->xmax)
+                       elog(ERROR, "corrupt snapshot data");
+               snap->xip[i] = val;
+       }
+ 
+       PG_RETURN_POINTER(snap);
+ }
+ 
+ /*
+  * binary storage
+  */
+ Datum
+ txid_snapshot_send(PG_FUNCTION_ARGS)
+ {
+       int i;
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+       StringInfoData buf;
+       txid val;
+ 
+       pq_begintypsend(&buf);
+       pq_sendint(&buf, snap->nxip, 4);
+       pq_sendint64(&buf, snap->xmin);
+       pq_sendint64(&buf, snap->xmax);
+       val = snap->xmin;
+       for (i = 0; i < snap->nxip; i++) {
+               unsigned delta = (unsigned)(snap->xip[i] - val);
+               val = snap->xip[i];
+               if (delta > 0x7FFF) {
+                       pq_sendint(&buf, (delta >> 16) | 0x8000, 2);
+                       pq_sendint(&buf, delta & 0xFFFF, 2);
+               } else {
+                       pq_sendint(&buf, delta, 2);
+               }
+       }
+ 
+       PG_FREE_IF_COPY(snap, 0);
+ 
+       PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+ }
+ 
+ /*
+  * checks if txid is visible in snapshot?
+  */
+ Datum
+ txid_snapshot_contains(PG_FUNCTION_ARGS)
+ {
+       txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+       txid            value = PG_GETARG_INT64(1);
+       int                     i;
+ 
+       if (value < snap->xmin)
+               PG_RETURN_BOOL(true);
+ 
+       if (value >= snap->xmax)
+               PG_RETURN_BOOL(false);
+ 
+       for (i = 0; i < snap->nxip; i++)
+       {
+               if (value == snap->xip[i])
+                       PG_RETURN_BOOL(false);
+       }
+ 
+       PG_FREE_IF_COPY(snap, 0);
+ 
+       PG_RETURN_BOOL(true);
+ }
+ 
+ /*
+  * Bump epoch to make following txids greater than sync_txid.
+  */
+ Datum
+ pg_sync_txid(PG_FUNCTION_ARGS)
+ {
+       int64      sync_txid = PG_GETARG_INT64(0);
+ 
+       if (!superuser())
+               elog(ERROR, "must be superuser to use pg_sync_txid");
+ 
+       SyncTxidEpoch(sync_txid);
+ 
+       PG_RETURN_INT64(get_current_txid());
+ }
+ 
Index: pgsql/src/backend/utils/adt/Makefile
===================================================================
*** pgsql.orig/src/backend/utils/adt/Makefile
--- pgsql/src/backend/utils/adt/Makefile
*************** OBJS = acl.o arrayfuncs.o array_userfunc
*** 25,31 ****
        tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \
        network.o mac.o inet_net_ntop.o inet_net_pton.o \
        ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \
!       ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o
  
  like.o: like.c like_match.c
  
--- 25,31 ----
        tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \
        network.o mac.o inet_net_ntop.o inet_net_pton.o \
        ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \
!       ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o txid.o
  
  like.o: like.c like_match.c
  
Index: pgsql/src/include/catalog/pg_proc.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_proc.h
--- pgsql/src/include/catalog/pg_proc.h
*************** DESCR("List all files in a directory");
*** 3086,3091 ****
--- 3086,3118 ----
  DATA(insert OID = 2626 ( pg_sleep                     PGNSP PGUID 12 f f t f 
v 1 2278 "701" _null_ _null_ _null_ pg_sleep - _null_ ));
  DESCR("Sleep for the specified time in seconds");
  
+ DATA(insert OID = 2789 ( current_txid         PGNSP PGUID 12 f f t f v 0 20 
"" _null_ _null_ _null_ txid_current_txid - _null_ ));
+ DESCR("Get current top transaction id");
+ DATA(insert OID = 2790 ( current_snapshot     PGNSP PGUID 12 f f t f v 0 2800 
"" _null_ _null_ _null_ txid_current_snapshot - _null_ ));
+ DESCR("Returns current snapshot.");
+ 
+ DATA(insert OID = 2791 ( snapshot_in                  PGNSP PGUID 12 f f t f 
i 1 2800 "2275" _null_ _null_ _null_  txid_snapshot_in - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2792 ( snapshot_out                 PGNSP PGUID 12 f f t f 
i 1 2275 "2800" _null_ _null_ _null_  txid_snapshot_out - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2793 ( snapshot_recv                        PGNSP PGUID 12 
f f t f i 1 2800 "2281" _null_ _null_ _null_     txid_snapshot_recv - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2794 ( snapshot_send                        PGNSP PGUID 12 
f f t f i 1 17 "2800" _null_ _null_ _null_       txid_snapshot_send - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2795 ( snapshot_xmin                        PGNSP PGUID 12 
f f t f i 1 20 "2800" _null_ _null_ _null_       txid_snapshot_xmin - _null_ ));
+ DESCR("Returns snapshot's xmin.");
+ DATA(insert OID = 2796 ( snapshot_xmax                        PGNSP PGUID 12 
f f t f i 1 20 "2800" _null_ _null_ _null_       txid_snapshot_xmax - _null_ ));
+ DESCR("Returns snapshot's xmax.");
+ DATA(insert OID = 2797 ( snapshot_contains            PGNSP PGUID 12 f f t f 
i 2 16 "2800 20" _null_ _null_ _null_    txid_snapshot_contains - _null_ ));
+ DESCR("Checks whether txid is in snapshot.");
+ DATA(insert OID = 2798 ( snapshot_active_list PGNSP PGUID 12 f f t t i 1 20 
"2800" _null_ _null_ _null_       txid_snapshot_active_list - _null_ ));
+ DESCR("Active transactions between snapshot xmin and xmax.");
+ DATA(insert OID = 2799 ( pg_sync_txid                 PGNSP PGUID 12 f f t f 
v 1 20 "20" _null_ _null_ _null_ pg_sync_txid - _null_ ));
+ DESCR("Synchronizes epoch so that new txid's will be larger that given txid");
+ DATA(insert OID = 2803 ( snapshot                             PGNSP PGUID 12 
f f t f i 1 2800 "25" _null_ _null_ _null_       txid_snapshot_from_text - 
_null_ ));
+ DESCR("Converts text to snapshot");
+ DATA(insert OID = 2804 ( text                                 PGNSP PGUID 12 
f f t f i 1 25 "2800" _null_ _null_ _null_       txid_snapshot_to_text - _null_ 
));
+ DESCR("Converts snapshot to text");
  
  /* Aggregates (moved here from pg_aggregate for 7.3) */
  
Index: pgsql/src/include/catalog/pg_type.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_type.h
--- pgsql/src/include/catalog/pg_type.h
*************** DATA(insert OID = 2210 ( _regclass         PG
*** 518,523 ****
--- 518,528 ----
  DATA(insert OID = 2211 ( _regtype        PGNSP PGUID -1 f b t \054 0 2206 
array_in array_out array_recv array_send - i x f 0 -1 0 _null_ _null_ ));
  #define REGTYPEARRAYOID       2211
  
+ DATA(insert OID = 2800 ( snapshot   PGNSP PGUID -1 f b t \054 0 0 snapshot_in 
snapshot_out snapshot_recv snapshot_send - d p f 0 -1 0 _null_ _null_ ));
+ #define SNAPSHOTOID   2800
+ DATA(insert OID = 2801 ( _snapshot  PGNSP PGUID -1 f b t \054 0 2800 array_in 
array_out array_recv array_send - i p f 0 -1 0 _null_ _null_ ));
+ #define SNAPSHOTARRAYOID      2801
+ 
  /*
   * pseudo-types
   *
Index: pgsql/src/include/catalog/pg_cast.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_cast.h
--- pgsql/src/include/catalog/pg_cast.h
*************** DATA(insert ( 1560 1560 1685 i ));
*** 392,395 ****
--- 392,399 ----
  DATA(insert ( 1562 1562 1687 i ));
  DATA(insert ( 1700 1700 1703 i ));
  
+ /* snapshot to/from text */
+ DATA(insert (   25 2800 2803 e ));
+ DATA(insert ( 2800   25 2804 e ));
+ 
  #endif   /* PG_CAST_H */
Index: pgsql/src/bin/pg_dump/pg_dumpall.c
===================================================================
*** pgsql.orig/src/bin/pg_dump/pg_dumpall.c
--- pgsql/src/bin/pg_dump/pg_dumpall.c
*************** static PGconn *connectDatabase(const cha
*** 59,64 ****
--- 59,66 ----
  static PGresult *executeQuery(PGconn *conn, const char *query);
  static void executeCommand(PGconn *conn, const char *query);
  
+ static void dumpTxidEpoch(PGconn *conn);
+ 
  static char pg_dump_bin[MAXPGPATH];
  static PQExpBuffer pgdumpopts;
  static bool output_clean = false;
*************** main(int argc, char *argv[])
*** 349,354 ****
--- 351,360 ----
                if (server_version >= 80000)
                        dumpTablespaces(conn);
  
+               /* Dump txid state */
+               if (server_version >= 80200)
+                       dumpTxidEpoch(conn);
+ 
                /* Dump CREATE DATABASE commands */
                if (!globals_only)
                        dumpCreateDB(conn);
*************** help(void)
*** 410,415 ****
--- 416,442 ----
  }
  
  
+ static void
+ dumpTxidEpoch(PGconn *conn)
+ {
+       long long txid;
+       PGresult *res;
+       int ntups;
+ 
+       res = executeQuery(conn, "SELECT get_current_txid()");
+       ntups = PQntuples(res);
+       if (ntups != 1)
+       {
+               fprintf(stderr, "ERROR: get_current_txid() failed\n");
+               exit(1);
+       }
+       txid = atoll(PQgetvalue(res, 0, 0));
+       PQclear(res);
+ 
+       printf("\n--\n-- synchronize txid epoch\n--\n");
+       printf("SELECT pg_sync_txid(%lld);\n\n", txid);
+ }
+ 
  
  /*
   * Dump roles
Index: pgsql/src/test/regress/parallel_schedule
===================================================================
*** pgsql.orig/src/test/regress/parallel_schedule
--- pgsql/src/test/regress/parallel_schedule
*************** test: numerology
*** 12,18 ****
  # ----------
  # The second group of parallel test
  # ----------
! test: point lseg box path polygon circle date time timetz timestamp 
timestamptz interval abstime reltime tinterval inet comments oidjoins 
type_sanity opr_sanity
  
  # Depends on point, lseg, box, path, polygon and circle
  test: geometry
--- 12,18 ----
  # ----------
  # The second group of parallel test
  # ----------
! test: point lseg box path polygon circle date time timetz timestamp 
timestamptz interval abstime reltime tinterval inet comments oidjoins 
type_sanity opr_sanity txid
  
  # Depends on point, lseg, box, path, polygon and circle
  test: geometry
Index: pgsql/src/test/regress/serial_schedule
===================================================================
*** pgsql.orig/src/test/regress/serial_schedule
--- pgsql/src/test/regress/serial_schedule
*************** test: comments
*** 35,40 ****
--- 35,41 ----
  test: oidjoins
  test: type_sanity
  test: opr_sanity
+ test: txid
  test: geometry
  test: horology
  test: insert
Index: pgsql/src/test/regress/expected/txid.out
===================================================================
*** /dev/null
--- pgsql/src/test/regress/expected/txid.out
***************
*** 0 ****
--- 1,159 ----
+ -- i/o
+ select '12:20:'::snapshot;
+  snapshot 
+ ----------
+  12:20:
+ (1 row)
+ 
+ select '12:15:'::snapshot;
+  snapshot 
+ ----------
+  12:15:
+ (1 row)
+ 
+ -- text conversion
+ select '12:13:'::text::snapshot;
+  snapshot 
+ ----------
+  12:13:
+ (1 row)
+ 
+ select '12:13:'::snapshot::text;
+   text  
+ --------
+  12:13:
+ (1 row)
+ 
+ -- errors
+ select '31:12:'::snapshot;
+ ERROR:  illegal txid_snapshot input format
+ select '0:1:'::snapshot;
+ ERROR:  illegal txid_snapshot input format
+ select '12:13:0'::snapshot;
+ ERROR:  illegal txid_snapshot input format
+ select '12:20:14,13'::snapshot;
+ ERROR:  illegal txid_snapshot input format
+ -- info
+ select snapshot_xmin('1:2:'::snapshot);
+  snapshot_xmin 
+ ---------------
+              1
+ (1 row)
+ 
+ select snapshot_xmax('1:2:'::snapshot);
+  snapshot_xmax 
+ ---------------
+              2
+ (1 row)
+ 
+ select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot);
+  snapshot_active_list 
+ ----------------------
+                     3
+                     4
+                     5
+                     6
+                     7
+ (5 rows)
+ 
+ -- storage
+ create table snapshot_test (
+       nr      integer,
+       snap    snapshot
+ );
+ insert into snapshot_test values (1, '12:13:');
+ -- small delta
+ insert into snapshot_test values (2, '12:20:13,15,18');
+ -- large delta
+ insert into snapshot_test values (3, 
'2000000:7000000:3000001,4000002,5000003,6000004');
+ select  snap,
+         snapshot_xmin(snap),
+       snapshot_xmax(snap)
+ from snapshot_test order by nr;
+                       snap                       | snapshot_xmin | 
snapshot_xmax 
+ 
-------------------------------------------------+---------------+---------------
+  12:13:                                          |            12 |            
13
+  12:20:13,15,18                                  |            12 |            
20
+  2000000:7000000:3000001,4000002,5000003,6000004 |       2000000 |       
7000000
+ (3 rows)
+ 
+ select id, snapshot_contains(snap, id)
+ from snapshot_test, generate_series(11, 21) id
+ where nr = 2;
+  id | snapshot_contains 
+ ----+-------------------
+  11 | t
+  12 | t
+  13 | f
+  14 | t
+  15 | f
+  16 | t
+  17 | t
+  18 | f
+  19 | t
+  20 | f
+  21 | f
+ (11 rows)
+ 
+ -- test current values also
+ select current_txid() >= snapshot_xmin(current_snapshot());
+  ?column? 
+ ----------
+  t
+ (1 row)
+ 
+ select current_txid() < snapshot_xmax(current_snapshot());
+  ?column? 
+ ----------
+  t
+ (1 row)
+ 
+ select snapshot_contains(current_snapshot(), current_txid());
+  snapshot_contains 
+ -------------------
+  t
+ (1 row)
+ 
+ -- pg_resync
+ select current_txid() >> 32;
+  ?column? 
+ ----------
+         0
+ (1 row)
+ 
+ select pg_sync_txid(400);
+  pg_sync_txid 
+ --------------
+              
+ (1 row)
+ 
+ select current_txid() >> 32;
+  ?column? 
+ ----------
+         0
+ (1 row)
+ 
+ select pg_sync_txid(8589934592);
+  pg_sync_txid 
+ --------------
+              
+ (1 row)
+ 
+ select current_txid() >> 32;
+  ?column? 
+ ----------
+         2
+ (1 row)
+ 
+ select pg_sync_txid(400);
+  pg_sync_txid 
+ --------------
+              
+ (1 row)
+ 
+ select current_txid() >> 32;
+  ?column? 
+ ----------
+         2
+ (1 row)
+ 
Index: pgsql/src/test/regress/sql/txid.sql
===================================================================
*** /dev/null
--- pgsql/src/test/regress/sql/txid.sql
***************
*** 0 ****
--- 1,55 ----
+ 
+ -- i/o
+ select '12:20:'::snapshot;
+ select '12:15:'::snapshot;
+ 
+ -- text conversion
+ select '12:13:'::text::snapshot;
+ select '12:13:'::snapshot::text;
+ 
+ -- errors
+ select '31:12:'::snapshot;
+ select '0:1:'::snapshot;
+ select '12:13:0'::snapshot;
+ select '12:20:14,13'::snapshot;
+ 
+ -- info
+ select snapshot_xmin('1:2:'::snapshot);
+ select snapshot_xmax('1:2:'::snapshot);
+ select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot);
+ 
+ -- storage
+ create table snapshot_test (
+       nr      integer,
+       snap    snapshot
+ );
+ 
+ insert into snapshot_test values (1, '12:13:');
+ -- small delta
+ insert into snapshot_test values (2, '12:20:13,15,18');
+ -- large delta
+ insert into snapshot_test values (3, 
'2000000:7000000:3000001,4000002,5000003,6000004');
+ 
+ select  snap,
+         snapshot_xmin(snap),
+       snapshot_xmax(snap)
+ from snapshot_test order by nr;
+ 
+ select id, snapshot_contains(snap, id)
+ from snapshot_test, generate_series(11, 21) id
+ where nr = 2;
+ 
+ -- test current values also
+ select current_txid() >= snapshot_xmin(current_snapshot());
+ select current_txid() < snapshot_xmax(current_snapshot());
+ select snapshot_contains(current_snapshot(), current_txid());
+ 
+ -- pg_resync
+ select current_txid() >> 32;
+ select pg_sync_txid(400);
+ select current_txid() >> 32;
+ select pg_sync_txid(8589934592);
+ select current_txid() >> 32;
+ select pg_sync_txid(400);
+ select current_txid() >> 32;
+ 
Index: pgsql/src/test/regress/output/misc.source
===================================================================
*** pgsql.orig/src/test/regress/output/misc.source
--- pgsql/src/test/regress/output/misc.source
*************** SELECT user_relns() AS user_relns
*** 650,655 ****
--- 650,656 ----
   road
   shighway
   slow_emp4000
+  snapshot_test
   street
   stud_emp
   student
*************** SELECT user_relns() AS user_relns
*** 665,671 ****
   toyemp
   varchar_tbl
   xacttest
! (99 rows)
  
  SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer')));
   name 
--- 666,672 ----
   toyemp
   varchar_tbl
   xacttest
! (100 rows)
  
  SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer')));
   name 
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster

Reply via email to