Intro
-----
Following patch exports 8 byte txid and snapshot to user level
allowing its use in regular SQL. It is based on Slony-I xxid
module. It provides special 'snapshot' type for snapshot but
uses regular int8 for transaction ID's.
Exported API
------------
Type: snapshot
Functions:
current_txid() returns int8
current_snapshot() returns snapshot
snapshot_xmin(snapshot) returns int8
snapshot_xmax(snapshot) returns int8
snapshot_active_list(snapshot) returns setof int8
snapshot_contains(snapshot, int8) returns bool
pg_sync_txid(int8) returns int8
Operation
---------
Extension to 8-byte is done by keeping track of wraparound count
in pg_control. On every checkpoint, nextxid is compared to one
stored in pg_control. If value is smaller wraparound happened
and epoch is inreased.
When long txid or snapshot is requested, pg_control is locked with
LW_SHARED for retrieving epoch value from it. The patch does not
affect core functionality in any other way.
Backup/restore of txid data
---------------------------
Currently I made pg_dumpall output following statement:
"SELECT pg_sync_txid(%d)", current_txid()
then on target database, pg_sync_txid if it's current
(epoch + GetTopTransactionId()) are larger than given argument.
If not then it bumps epoch, until they are, thus guaranteeing that
new issued txid's are larger then in source database. If restored
into same database instance, nothing will happen.
Advantages of 8-byte txids
--------------------------
* Indexes won't break silently. No need for mandatory periodic
truncate which may not happen for various reasons.
* Allows to keep values from different databases in one table/index.
* Ability to bring data into different server and continue there.
Advantages in being in core
---------------------------
* Core code can guarantee that wraparound check happens in 2G transactions.
* Core code can update pg_control non-transactionally. Module
needs to operate inside user transaction when updating epoch
row, which bring various problems (READ COMMITTED vs. SERIALIZABLE,
long transactions, locking, etc).
* Core code has only one place where it needs to update, module
needs to have epoch table in each database.
Todo, tothink
-------------
* Flesh out the documentation. Probably needs some background.
* Better names for some functions?
* pg_sync_txid allows use of pg_dump for moveing database,
but also adds possibility to shoot in the foot by allowing
epoch wraparound to happen. Is "Don't do it then" enough?
* Currently txid keeps its own copy of nextxid in pg_control,
this makes clear data dependencies. Its possible to drop it
and use ->checkPointCopy->nextXid directly, thus saving 4 bytes.
* Should the pg_sync_txid() issued by pg_dump instead pg_dumpall?
--
marko
Index: pgsql/doc/src/sgml/func.sgml
===================================================================
*** pgsql.orig/doc/src/sgml/func.sgml
--- pgsql/doc/src/sgml/func.sgml
*************** SELECT pg_type_is_visible('myschema.widg
*** 9699,9704 ****
--- 9699,9805 ----
databases within each cluster and their descriptions are stored globally
as well.
</para>
+
+ <para>
+ <xref linkend="functions-info-txid"> shows functions
+ querying transaction IDs and snapshots.
+ </para>
+
+ <table id="functions-info-txid">
+ <title>Transaction state functions</title>
+ <tgroup cols="3">
+ <thead>
+ <row><entry>Name</entry> <entry>Return Type</entry>
<entry>Description</entry></row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry><literal><function>current_txid</function>()</literal></entry>
+ <entry><type>int8</type></entry>
+ <entry>transaction ID of current transaction</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>current_snapshot</function>()</literal></entry>
+ <entry><type>snapshot</type></entry>
+ <entry>snapshot for current transaction (or statement)</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>snapshot_xmin</function>(<type>snapshot</type>)</literal></entry>
+ <entry><type>int8</type></entry>
+ <entry>minimal txid in snapshot - all transactions below are visible
in snapshot</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>snapshot_xmax</function>(<type>snapshot</type>)</literal></entry>
+ <entry><type>int8</type></entry>
+ <entry>maximal txid in snapshot - all transactions above are not
visible in snapshot</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>snapshot_active_list</function>(<type>snapshot</type>)</literal></entry>
+ <entry><type>setof int8</type></entry>
+ <entry>list of active txid-s in snapshot</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>snapshot_contains</function>(<type>snapshot</type>,
<type>int8</type>)</literal></entry>
+ <entry><type>bool</type></entry>
+ <entry>checks if txid is visible in snapshot</entry>
+ </row>
+
+ <row>
+
<entry><literal><function>pg_sync_txid</function>(<type>int8</type>)</literal></entry>
+ <entry><type>int8</type></entry>
+ <entry>makes sure the following txids will be greater than given
argument</entry>
+ </row>
+
+ </tbody>
+ </tgroup>
+ </table>
+
+ <indexterm zone="functions-info">
+ <primary>txid</primary>
+ <secondary>current</secondary>
+ </indexterm>
+
+ <indexterm zone="functions-info">
+ <primary>snapshot</primary>
+ <secondary>current</secondary>
+ </indexterm>
+
+ <indexterm zone="functions-info">
+ <primary>current_txid</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>current_snapshot</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>snapshot_xmin</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>snapshot_xmax</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>snapshot_contains</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>snapshot_uncommitted</primary>
+ </indexterm>
+ <indexterm zone="functions-info">
+ <primary>pg_sync_txid</primary>
+ </indexterm>
+
+ <para>
+ A snapshot is basically a list of running transactions.
+ </para>
+ <note>
+ <para>
+ TODO: describe usage.
+ </para>
+ </note>
+
</sect1>
<sect1 id="functions-admin">
Index: pgsql/src/include/access/xlogutils.h
===================================================================
*** pgsql.orig/src/include/access/xlogutils.h
--- pgsql/src/include/access/xlogutils.h
***************
*** 13,18 ****
--- 13,19 ----
#include "storage/buf.h"
#include "utils/rel.h"
+ #include "catalog/pg_control.h"
extern void XLogInitRelationCache(void);
*************** extern void XLogTruncateRelation(RelFile
*** 26,29 ****
--- 27,33 ----
extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);
+ extern void GetTxidEpoch(TxidEpoch *dst);
+ extern void SyncTxidEpoch(uint64 sync_txid);
+
#endif
Index: pgsql/src/include/catalog/pg_control.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_control.h
--- pgsql/src/include/catalog/pg_control.h
*************** typedef enum DBState
*** 62,67 ****
--- 62,75 ----
#define LOCALE_NAME_BUFLEN 128
/*
+ * Long txid in expanded form.
+ */
+ typedef struct {
+ uint32 epoch; /* epoch value */
+ TransactionId epoch_xid; /* corresponding xid */
+ } TxidEpoch;
+
+ /*
* Contents of pg_control.
*
* NOTE: try to keep this under 512 bytes so that it will fit on one physical
*************** typedef struct ControlFileData
*** 143,148 ****
--- 151,159 ----
char lc_collate[LOCALE_NAME_BUFLEN];
char lc_ctype[LOCALE_NAME_BUFLEN];
+ /* external txid tracking */
+ TxidEpoch txid_epoch;
+
/* CRC of all above ... MUST BE LAST! */
pg_crc32 crc;
} ControlFileData;
Index: pgsql/src/include/utils/builtins.h
===================================================================
*** pgsql.orig/src/include/utils/builtins.h
--- pgsql/src/include/utils/builtins.h
*************** extern Datum pg_prepared_statement(PG_FU
*** 886,889 ****
--- 886,904 ----
/* utils/mmgr/portalmem.c */
extern Datum pg_cursor(PG_FUNCTION_ARGS);
+ /* utils/adt/txid.c */
+ extern Datum txid_current_txid(PG_FUNCTION_ARGS);
+ extern Datum txid_current_snapshot(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_in(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_out(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_send(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_contains(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_xmax(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_active_list(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_from_text(PG_FUNCTION_ARGS);
+ extern Datum txid_snapshot_to_text(PG_FUNCTION_ARGS);
+ extern Datum pg_sync_txid(PG_FUNCTION_ARGS);
+
#endif /* BUILTINS_H */
Index: pgsql/src/backend/access/transam/xlog.c
===================================================================
*** pgsql.orig/src/backend/access/transam/xlog.c
--- pgsql/src/backend/access/transam/xlog.c
*************** static void xlog_outrec(StringInfo buf,
*** 501,506 ****
--- 501,507 ----
static bool read_backup_label(XLogRecPtr *checkPointLoc);
static void remove_backup_label(void);
static void rm_redo_error_callback(void *arg);
+ static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid);
/*
*************** CreateCheckPoint(bool shutdown, bool for
*** 5378,5383 ****
--- 5379,5387 ----
ControlFile->checkPoint = ProcLastRecPtr;
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = time(NULL);
+
+ update_txid_epoch(&ControlFile->txid_epoch, checkPoint.nextXid);
+
UpdateControlFile();
LWLockRelease(ControlFileLock);
*************** rm_redo_error_callback(void *arg)
*** 6143,6145 ****
--- 6147,6230 ----
pfree(buf.data);
}
+
+ /*
+ * As MAX_INT64 is used for InvalidTransactionId,
+ * dont allow 0x7FFFFFFF as epoch value, so that no valid
+ * transaction can get its value.
+ */
+ #define MAX_EPOCH 0x7FFFFFFE
+
+ /*
+ * Per-checkpoint refresh of pg_control values.
+ *
+ * Increases epoch if wraparound happened.
+ */
+ static void update_txid_epoch(TxidEpoch *state, TransactionId cur_xid)
+ {
+ if (cur_xid < state->epoch_xid)
+ {
+ state->epoch++;
+ if (state->epoch > MAX_EPOCH)
+ {
+ elog(WARNING, "txid epoch wraparound");
+ state->epoch = 0;
+ }
+ }
+ state->epoch_xid = cur_xid;
+ }
+
+ /*
+ * Return current txid_epoch.
+ */
+ void GetTxidEpoch(TxidEpoch *dst)
+ {
+ LWLockAcquire(ControlFileLock, LW_SHARED);
+ *dst = ControlFile->txid_epoch;
+ LWLockRelease(ControlFileLock);
+ }
+
+ /*
+ * Checks if new issued txid's are greater then txid_sync.
+ * Inreases epoch if needed.
+ */
+ void SyncTxidEpoch(uint64 txid_sync)
+ {
+ TransactionId cur_xid = GetCurrentTransactionId();
+ TransactionId sync_xid = (TransactionId)txid_sync;
+ uint32 sync_epoch = txid_sync >> 32;
+ int change = 0;
+ TxidEpoch *state;
+
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+ state = &ControlFile->txid_epoch;
+
+ if (state->epoch < sync_epoch)
+ {
+ state->epoch = sync_epoch;
+ change = 1;
+ }
+ if (state->epoch == sync_epoch)
+ {
+ if (cur_xid <= sync_xid)
+ {
+ state->epoch++;
+ change = 1;
+ }
+ }
+
+ if (change)
+ {
+ if (state->epoch > MAX_EPOCH)
+ {
+ elog(WARNING, "txid epoch wraparound");
+ state->epoch = 0;
+ }
+ state->epoch_xid = cur_xid;
+ UpdateControlFile();
+ }
+
+ LWLockRelease(ControlFileLock);
+ }
+
Index: pgsql/src/backend/utils/adt/txid.c
===================================================================
*** /dev/null
--- pgsql/src/backend/utils/adt/txid.c
***************
*** 0 ****
--- 1,507 ----
+ /*-------------------------------------------------------------------------
+ * txid.c
+ *
+ * Safe handling of transaction ID's.
+ *
+ * Copyright (c) 2003-2004, PostgreSQL Global Development Group
+ * Author: Jan Wieck, Afilias USA INC.
+ *
+ * Extend to 8-byte: Marko Kreen, Skype Technologies
+ *-------------------------------------------------------------------------
+ */
+
+ #include "postgres.h"
+
+ #include <limits.h>
+
+ #include "access/xact.h"
+ #include "access/transam.h"
+ #include "executor/spi.h"
+ #include "libpq/pqformat.h"
+ #include "miscadmin.h"
+ #include "utils/array.h"
+ #include "utils/lsyscache.h"
+ #include "catalog/catversion.h"
+ #include "catalog/pg_control.h"
+ #include "funcapi.h"
+
+ #ifdef INT64_IS_BUSTED
+ #error txid needs working int64
+ #endif
+
+ #define MAX_INT64 0x7FFFFFFFFFFFFFFFLL
+
+ /* Use unsigned variant internally */
+ typedef uint64 txid;
+
+ /*
+ * In-memory representation of snapshot.
+ */
+ typedef struct
+ {
+ int32 varsz;
+ uint32 nxip;
+ txid xmin;
+ txid xmax;
+ txid xip[1];
+ } txid_snapshot;
+
+ /*
+ * txid_snapshot_active_list needs to remember
+ * state between function calls
+ */
+ struct snap_state {
+ int pos;
+ txid_snapshot *snap;
+ };
+
+ /*
+ * do a TransactionId -> txid conversion
+ */
+ static txid convert_xid(TransactionId xid, const TxidEpoch *state)
+ {
+ uint64 epoch;
+
+ /* avoid issues with the the special meaning of 0 */
+ if (xid == InvalidTransactionId)
+ return MAX_INT64;
+
+ /* return special xid's as-is */
+ if (xid < FirstNormalTransactionId)
+ return xid;
+
+ /* xid can on both sides on wrap-around */
+ epoch = state->epoch;
+ if (TransactionIdPrecedes(xid, state->epoch_xid)) {
+ if (xid > state->epoch_xid)
+ epoch--;
+ } else if (TransactionIdFollows(xid, state->epoch_xid)) {
+ if (xid < state->epoch_xid)
+ epoch++;
+ }
+ return (epoch << 32) | xid;
+ }
+
+ static int _cmp_txid(const void *aa, const void *bb)
+ {
+ const uint64 *a = aa;
+ const uint64 *b = bb;
+ if (*a < *b)
+ return -1;
+ if (*a > *b)
+ return 1;
+ return 0;
+ }
+
+ static void sort_snapshot(txid_snapshot *snap)
+ {
+ qsort(snap->xip, snap->nxip, sizeof(txid), _cmp_txid);
+ }
+
+ /*
+ * Convert a cstring to txid_snapshot
+ */
+ static txid_snapshot *
+ parse_snapshot(const char *str)
+ {
+ int a_size;
+ txid *xip;
+
+ int a_used = 0;
+ txid xmin;
+ txid xmax;
+ txid last_val = 0, val;
+ txid_snapshot *snap;
+ int size;
+
+ char *endp;
+
+ a_size = 1024;
+ xip = (txid *) palloc(sizeof(txid) * a_size);
+
+ xmin = (txid) strtoull(str, &endp, 0);
+ if (*endp != ':')
+ elog(ERROR, "illegal txid_snapshot input format");
+ str = endp + 1;
+
+ xmax = (txid) strtoull(str, &endp, 0);
+ if (*endp != ':')
+ elog(ERROR, "illegal txid_snapshot input format");
+ str = endp + 1;
+
+ /* it should look sane */
+ if (xmin >= xmax || xmin > MAX_INT64 || xmax > MAX_INT64
+ || xmin == 0 || xmax == 0)
+ elog(ERROR, "illegal txid_snapshot input format");
+
+ while (*str != '\0')
+ {
+ if (a_used >= a_size)
+ {
+ a_size *= 2;
+ xip = (txid *) repalloc(xip, sizeof(txid) * a_size);
+ }
+
+ /* read next value */
+ if (*str == '\'')
+ {
+ str++;
+ val = (txid) strtoull(str, &endp, 0);
+ if (*endp != '\'')
+ elog(ERROR, "illegal txid_snapshot input
format");
+ str = endp + 1;
+ }
+ else
+ {
+ val = (txid) strtoull(str, &endp, 0);
+ str = endp;
+ }
+
+ /* require the input to be in order */
+ if (val < xmin || val <= last_val || val >= xmax)
+ elog(ERROR, "illegal txid_snapshot input format");
+
+ xip[a_used++] = val;
+ last_val = val;
+
+ if (*str == ',')
+ str++;
+ else
+ {
+ if (*str != '\0')
+ elog(ERROR, "illegal txid_snapshot input
format");
+ }
+ }
+
+ size = offsetof(txid_snapshot, xip) + sizeof(txid) * a_used;
+ snap = (txid_snapshot *) palloc(size);
+ snap->varsz = size;
+ snap->xmin = xmin;
+ snap->xmax = xmax;
+ snap->nxip = a_used;
+ if (a_used > 0)
+ memcpy(&(snap->xip[0]), xip, sizeof(txid) * a_used);
+ pfree(xip);
+
+ return snap;
+ }
+
+ static txid
+ get_current_txid()
+ {
+ TxidEpoch state;
+ GetTxidEpoch(&state);
+ return convert_xid(GetTopTransactionId(), &state);
+ }
+
+ /*
+ * Return the current transaction ID as txid
+ */
+ Datum
+ txid_current_txid(PG_FUNCTION_ARGS)
+ {
+ PG_RETURN_INT64(get_current_txid());
+ }
+
+ /*
+ * Return current snapshot
+ */
+ Datum
+ txid_current_snapshot(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap;
+ unsigned num, i, size;
+ TxidEpoch state;
+
+ if (SerializableSnapshot == NULL)
+ elog(ERROR, "current_snapshot: SerializableSnapshot == NULL");
+
+ GetTxidEpoch(&state);
+
+ num = SerializableSnapshot->xcnt;
+ size = offsetof(txid_snapshot, xip) + sizeof(txid) * num;
+ snap = palloc(size);
+ snap->varsz = size;
+ snap->xmin = convert_xid(SerializableSnapshot->xmin, &state);
+ snap->xmax = convert_xid(SerializableSnapshot->xmax, &state);
+ snap->nxip = num;
+ for (i = 0; i < num; i++)
+ snap->xip[i] = convert_xid(SerializableSnapshot->xip[i],
&state);
+
+ /* we want guaranteed ascending order */
+ sort_snapshot(snap);
+
+ PG_RETURN_POINTER(snap);
+ }
+
+ /*
+ * Return snapshot's xmin
+ */
+ Datum
+ txid_snapshot_xmin(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ int64 res = snap->xmin;
+
+ PG_FREE_IF_COPY(snap, 0);
+ PG_RETURN_INT64(res);
+ }
+
+ /*
+ * Return snapshot's xmax
+ */
+ Datum
+ txid_snapshot_xmax(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ int64 res = snap->xmax;
+
+ PG_FREE_IF_COPY(snap, 0);
+ PG_RETURN_INT64(res);
+ }
+
+ /*
+ * returns uncommitted TXID's in snapshot.
+ */
+ Datum
+ txid_snapshot_active_list(PG_FUNCTION_ARGS)
+ {
+ FuncCallContext *fctx;
+ struct snap_state *state;
+
+ if (SRF_IS_FIRSTCALL()) {
+ txid_snapshot *snap;
+ int statelen;
+
+ snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+
+ fctx = SRF_FIRSTCALL_INIT();
+ statelen = sizeof(*state) + snap->varsz;
+ state = MemoryContextAlloc(fctx->multi_call_memory_ctx,
statelen);
+ state->pos = 0;
+ state->snap = (txid_snapshot *)((char *)state + sizeof(*state));
+ memcpy(state->snap, snap, snap->varsz);
+ fctx->user_fctx = state;
+ PG_FREE_IF_COPY(snap, 0);
+ }
+ fctx = SRF_PERCALL_SETUP();
+ state = fctx->user_fctx;
+ if (state->pos < state->snap->nxip) {
+ Datum res = Int64GetDatum(state->snap->xip[state->pos]);
+ state->pos++;
+ SRF_RETURN_NEXT(fctx, res);
+ } else {
+ SRF_RETURN_DONE(fctx);
+ }
+ }
+
+ /*
+ * input function for type txid_snapshot
+ */
+ Datum
+ txid_snapshot_in(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap;
+ char *str = PG_GETARG_CSTRING(0);
+
+ snap = parse_snapshot(str);
+ PG_RETURN_POINTER(snap);
+ }
+
+ /*
+ * output function for type txid_snapshot
+ */
+ Datum
+ txid_snapshot_out(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+
+ char *str = palloc(60 + snap->nxip * 30);
+ char *cp = str;
+ int i;
+
+ snprintf(str, 60, "%llu:%llu:",
+ (unsigned long long)snap->xmin,
+ (unsigned long long)snap->xmax);
+ cp = str + strlen(str);
+
+ for (i = 0; i < snap->nxip; i++)
+ {
+ snprintf(cp, 30, "%llu%s",
+ (unsigned long long)snap->xip[i],
+ (i < snap->nxip - 1) ? "," : "");
+ cp += strlen(cp);
+ }
+
+ PG_FREE_IF_COPY(snap, 0);
+
+ PG_RETURN_CSTRING(str);
+ }
+
+ /*
+ * convert text to txid_snapshot
+ */
+ Datum
+ txid_snapshot_from_text(PG_FUNCTION_ARGS)
+ {
+ text *txt = PG_GETARG_TEXT_P(0);
+ txid_snapshot *snap;
+ char *str;
+ int len;
+
+ len = VARSIZE(txt) - VARHDRSZ;
+ str = palloc(len + 1);
+ memcpy(str, VARDATA(txt), len);
+ str[len] = 0;
+
+ snap = parse_snapshot(str);
+
+ pfree(str);
+ PG_FREE_IF_COPY(txt, 0);
+
+ PG_RETURN_POINTER(snap);
+ }
+
+ /*
+ * convert txid_snapshot to text
+ */
+ Datum
+ txid_snapshot_to_text(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+
+ text *res = palloc(VARHDRSZ + 60 + snap->nxip * 30);
+ char *str = VARDATA(res);
+ char *cp = str;
+ int i;
+
+ snprintf(str, 60, "%llu:%llu:",
+ (unsigned long long)snap->xmin,
+ (unsigned long long)snap->xmax);
+ cp = str + strlen(str);
+
+ for (i = 0; i < snap->nxip; i++)
+ {
+ snprintf(cp, 30, "%llu%s",
+ (unsigned long long)snap->xip[i],
+ (i < snap->nxip - 1) ? "," : "");
+ cp += strlen(cp);
+ }
+
+ VARATT_SIZEP(res) = VARHDRSZ + cp - str;
+
+ PG_FREE_IF_COPY(snap, 0);
+
+ PG_RETURN_TEXT_P(res);
+ }
+
+ /*
+ * read binary representation
+ */
+ Datum
+ txid_snapshot_recv(PG_FUNCTION_ARGS)
+ {
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+ txid_snapshot *snap;
+ unsigned i, count, size;
+ txid val;
+
+ count = pq_getmsgint(buf, 4);
+ size = offsetof(txid_snapshot, xip) + sizeof(txid) * count;
+ snap = palloc(size);
+
+ snap->varsz = size;
+ snap->nxip = count;
+ snap->xmin = pq_getmsgint64(buf);
+ snap->xmax = pq_getmsgint64(buf);
+ val = snap->xmin;
+ for (i = 0; i < count; i++) {
+ unsigned delta;
+ delta = pq_getmsgint(buf, 2);
+ if (delta & 0x8000)
+ val += ((delta & 0x7FFF) << 16) + pq_getmsgint(buf, 2);
+ else
+ val += delta;
+ if (val < snap->xmin || val > snap->xmax)
+ elog(ERROR, "corrupt snapshot data");
+ snap->xip[i] = val;
+ }
+
+ PG_RETURN_POINTER(snap);
+ }
+
+ /*
+ * binary storage
+ */
+ Datum
+ txid_snapshot_send(PG_FUNCTION_ARGS)
+ {
+ int i;
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ StringInfoData buf;
+ txid val;
+
+ pq_begintypsend(&buf);
+ pq_sendint(&buf, snap->nxip, 4);
+ pq_sendint64(&buf, snap->xmin);
+ pq_sendint64(&buf, snap->xmax);
+ val = snap->xmin;
+ for (i = 0; i < snap->nxip; i++) {
+ unsigned delta = (unsigned)(snap->xip[i] - val);
+ val = snap->xip[i];
+ if (delta > 0x7FFF) {
+ pq_sendint(&buf, (delta >> 16) | 0x8000, 2);
+ pq_sendint(&buf, delta & 0xFFFF, 2);
+ } else {
+ pq_sendint(&buf, delta, 2);
+ }
+ }
+
+ PG_FREE_IF_COPY(snap, 0);
+
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+ }
+
+ /*
+ * checks if txid is visible in snapshot?
+ */
+ Datum
+ txid_snapshot_contains(PG_FUNCTION_ARGS)
+ {
+ txid_snapshot *snap = (txid_snapshot *) PG_GETARG_VARLENA_P(0);
+ txid value = PG_GETARG_INT64(1);
+ int i;
+
+ if (value < snap->xmin)
+ PG_RETURN_BOOL(true);
+
+ if (value >= snap->xmax)
+ PG_RETURN_BOOL(false);
+
+ for (i = 0; i < snap->nxip; i++)
+ {
+ if (value == snap->xip[i])
+ PG_RETURN_BOOL(false);
+ }
+
+ PG_FREE_IF_COPY(snap, 0);
+
+ PG_RETURN_BOOL(true);
+ }
+
+ /*
+ * Bump epoch to make following txids greater than sync_txid.
+ */
+ Datum
+ pg_sync_txid(PG_FUNCTION_ARGS)
+ {
+ int64 sync_txid = PG_GETARG_INT64(0);
+
+ if (!superuser())
+ elog(ERROR, "must be superuser to use pg_sync_txid");
+
+ SyncTxidEpoch(sync_txid);
+
+ PG_RETURN_INT64(get_current_txid());
+ }
+
Index: pgsql/src/backend/utils/adt/Makefile
===================================================================
*** pgsql.orig/src/backend/utils/adt/Makefile
--- pgsql/src/backend/utils/adt/Makefile
*************** OBJS = acl.o arrayfuncs.o array_userfunc
*** 25,31 ****
tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \
network.o mac.o inet_net_ntop.o inet_net_pton.o \
ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \
! ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o
like.o: like.c like_match.c
--- 25,31 ----
tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \
network.o mac.o inet_net_ntop.o inet_net_pton.o \
ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \
! ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o txid.o
like.o: like.c like_match.c
Index: pgsql/src/include/catalog/pg_proc.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_proc.h
--- pgsql/src/include/catalog/pg_proc.h
*************** DESCR("List all files in a directory");
*** 3086,3091 ****
--- 3086,3118 ----
DATA(insert OID = 2626 ( pg_sleep PGNSP PGUID 12 f f t f
v 1 2278 "701" _null_ _null_ _null_ pg_sleep - _null_ ));
DESCR("Sleep for the specified time in seconds");
+ DATA(insert OID = 2789 ( current_txid PGNSP PGUID 12 f f t f v 0 20
"" _null_ _null_ _null_ txid_current_txid - _null_ ));
+ DESCR("Get current top transaction id");
+ DATA(insert OID = 2790 ( current_snapshot PGNSP PGUID 12 f f t f v 0 2800
"" _null_ _null_ _null_ txid_current_snapshot - _null_ ));
+ DESCR("Returns current snapshot.");
+
+ DATA(insert OID = 2791 ( snapshot_in PGNSP PGUID 12 f f t f
i 1 2800 "2275" _null_ _null_ _null_ txid_snapshot_in - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2792 ( snapshot_out PGNSP PGUID 12 f f t f
i 1 2275 "2800" _null_ _null_ _null_ txid_snapshot_out - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2793 ( snapshot_recv PGNSP PGUID 12
f f t f i 1 2800 "2281" _null_ _null_ _null_ txid_snapshot_recv - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2794 ( snapshot_send PGNSP PGUID 12
f f t f i 1 17 "2800" _null_ _null_ _null_ txid_snapshot_send - _null_ ));
+ DESCR("I/O");
+ DATA(insert OID = 2795 ( snapshot_xmin PGNSP PGUID 12
f f t f i 1 20 "2800" _null_ _null_ _null_ txid_snapshot_xmin - _null_ ));
+ DESCR("Returns snapshot's xmin.");
+ DATA(insert OID = 2796 ( snapshot_xmax PGNSP PGUID 12
f f t f i 1 20 "2800" _null_ _null_ _null_ txid_snapshot_xmax - _null_ ));
+ DESCR("Returns snapshot's xmax.");
+ DATA(insert OID = 2797 ( snapshot_contains PGNSP PGUID 12 f f t f
i 2 16 "2800 20" _null_ _null_ _null_ txid_snapshot_contains - _null_ ));
+ DESCR("Checks whether txid is in snapshot.");
+ DATA(insert OID = 2798 ( snapshot_active_list PGNSP PGUID 12 f f t t i 1 20
"2800" _null_ _null_ _null_ txid_snapshot_active_list - _null_ ));
+ DESCR("Active transactions between snapshot xmin and xmax.");
+ DATA(insert OID = 2799 ( pg_sync_txid PGNSP PGUID 12 f f t f
v 1 20 "20" _null_ _null_ _null_ pg_sync_txid - _null_ ));
+ DESCR("Synchronizes epoch so that new txid's will be larger that given txid");
+ DATA(insert OID = 2803 ( snapshot PGNSP PGUID 12
f f t f i 1 2800 "25" _null_ _null_ _null_ txid_snapshot_from_text -
_null_ ));
+ DESCR("Converts text to snapshot");
+ DATA(insert OID = 2804 ( text PGNSP PGUID 12
f f t f i 1 25 "2800" _null_ _null_ _null_ txid_snapshot_to_text - _null_
));
+ DESCR("Converts snapshot to text");
/* Aggregates (moved here from pg_aggregate for 7.3) */
Index: pgsql/src/include/catalog/pg_type.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_type.h
--- pgsql/src/include/catalog/pg_type.h
*************** DATA(insert OID = 2210 ( _regclass PG
*** 518,523 ****
--- 518,528 ----
DATA(insert OID = 2211 ( _regtype PGNSP PGUID -1 f b t \054 0 2206
array_in array_out array_recv array_send - i x f 0 -1 0 _null_ _null_ ));
#define REGTYPEARRAYOID 2211
+ DATA(insert OID = 2800 ( snapshot PGNSP PGUID -1 f b t \054 0 0 snapshot_in
snapshot_out snapshot_recv snapshot_send - d p f 0 -1 0 _null_ _null_ ));
+ #define SNAPSHOTOID 2800
+ DATA(insert OID = 2801 ( _snapshot PGNSP PGUID -1 f b t \054 0 2800 array_in
array_out array_recv array_send - i p f 0 -1 0 _null_ _null_ ));
+ #define SNAPSHOTARRAYOID 2801
+
/*
* pseudo-types
*
Index: pgsql/src/include/catalog/pg_cast.h
===================================================================
*** pgsql.orig/src/include/catalog/pg_cast.h
--- pgsql/src/include/catalog/pg_cast.h
*************** DATA(insert ( 1560 1560 1685 i ));
*** 392,395 ****
--- 392,399 ----
DATA(insert ( 1562 1562 1687 i ));
DATA(insert ( 1700 1700 1703 i ));
+ /* snapshot to/from text */
+ DATA(insert ( 25 2800 2803 e ));
+ DATA(insert ( 2800 25 2804 e ));
+
#endif /* PG_CAST_H */
Index: pgsql/src/bin/pg_dump/pg_dumpall.c
===================================================================
*** pgsql.orig/src/bin/pg_dump/pg_dumpall.c
--- pgsql/src/bin/pg_dump/pg_dumpall.c
*************** static PGconn *connectDatabase(const cha
*** 59,64 ****
--- 59,66 ----
static PGresult *executeQuery(PGconn *conn, const char *query);
static void executeCommand(PGconn *conn, const char *query);
+ static void dumpTxidEpoch(PGconn *conn);
+
static char pg_dump_bin[MAXPGPATH];
static PQExpBuffer pgdumpopts;
static bool output_clean = false;
*************** main(int argc, char *argv[])
*** 349,354 ****
--- 351,360 ----
if (server_version >= 80000)
dumpTablespaces(conn);
+ /* Dump txid state */
+ if (server_version >= 80200)
+ dumpTxidEpoch(conn);
+
/* Dump CREATE DATABASE commands */
if (!globals_only)
dumpCreateDB(conn);
*************** help(void)
*** 410,415 ****
--- 416,442 ----
}
+ static void
+ dumpTxidEpoch(PGconn *conn)
+ {
+ long long txid;
+ PGresult *res;
+ int ntups;
+
+ res = executeQuery(conn, "SELECT get_current_txid()");
+ ntups = PQntuples(res);
+ if (ntups != 1)
+ {
+ fprintf(stderr, "ERROR: get_current_txid() failed\n");
+ exit(1);
+ }
+ txid = atoll(PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ printf("\n--\n-- synchronize txid epoch\n--\n");
+ printf("SELECT pg_sync_txid(%lld);\n\n", txid);
+ }
+
/*
* Dump roles
Index: pgsql/src/test/regress/parallel_schedule
===================================================================
*** pgsql.orig/src/test/regress/parallel_schedule
--- pgsql/src/test/regress/parallel_schedule
*************** test: numerology
*** 12,18 ****
# ----------
# The second group of parallel test
# ----------
! test: point lseg box path polygon circle date time timetz timestamp
timestamptz interval abstime reltime tinterval inet comments oidjoins
type_sanity opr_sanity
# Depends on point, lseg, box, path, polygon and circle
test: geometry
--- 12,18 ----
# ----------
# The second group of parallel test
# ----------
! test: point lseg box path polygon circle date time timetz timestamp
timestamptz interval abstime reltime tinterval inet comments oidjoins
type_sanity opr_sanity txid
# Depends on point, lseg, box, path, polygon and circle
test: geometry
Index: pgsql/src/test/regress/serial_schedule
===================================================================
*** pgsql.orig/src/test/regress/serial_schedule
--- pgsql/src/test/regress/serial_schedule
*************** test: comments
*** 35,40 ****
--- 35,41 ----
test: oidjoins
test: type_sanity
test: opr_sanity
+ test: txid
test: geometry
test: horology
test: insert
Index: pgsql/src/test/regress/expected/txid.out
===================================================================
*** /dev/null
--- pgsql/src/test/regress/expected/txid.out
***************
*** 0 ****
--- 1,159 ----
+ -- i/o
+ select '12:20:'::snapshot;
+ snapshot
+ ----------
+ 12:20:
+ (1 row)
+
+ select '12:15:'::snapshot;
+ snapshot
+ ----------
+ 12:15:
+ (1 row)
+
+ -- text conversion
+ select '12:13:'::text::snapshot;
+ snapshot
+ ----------
+ 12:13:
+ (1 row)
+
+ select '12:13:'::snapshot::text;
+ text
+ --------
+ 12:13:
+ (1 row)
+
+ -- errors
+ select '31:12:'::snapshot;
+ ERROR: illegal txid_snapshot input format
+ select '0:1:'::snapshot;
+ ERROR: illegal txid_snapshot input format
+ select '12:13:0'::snapshot;
+ ERROR: illegal txid_snapshot input format
+ select '12:20:14,13'::snapshot;
+ ERROR: illegal txid_snapshot input format
+ -- info
+ select snapshot_xmin('1:2:'::snapshot);
+ snapshot_xmin
+ ---------------
+ 1
+ (1 row)
+
+ select snapshot_xmax('1:2:'::snapshot);
+ snapshot_xmax
+ ---------------
+ 2
+ (1 row)
+
+ select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot);
+ snapshot_active_list
+ ----------------------
+ 3
+ 4
+ 5
+ 6
+ 7
+ (5 rows)
+
+ -- storage
+ create table snapshot_test (
+ nr integer,
+ snap snapshot
+ );
+ insert into snapshot_test values (1, '12:13:');
+ -- small delta
+ insert into snapshot_test values (2, '12:20:13,15,18');
+ -- large delta
+ insert into snapshot_test values (3,
'2000000:7000000:3000001,4000002,5000003,6000004');
+ select snap,
+ snapshot_xmin(snap),
+ snapshot_xmax(snap)
+ from snapshot_test order by nr;
+ snap | snapshot_xmin |
snapshot_xmax
+
-------------------------------------------------+---------------+---------------
+ 12:13: | 12 |
13
+ 12:20:13,15,18 | 12 |
20
+ 2000000:7000000:3000001,4000002,5000003,6000004 | 2000000 |
7000000
+ (3 rows)
+
+ select id, snapshot_contains(snap, id)
+ from snapshot_test, generate_series(11, 21) id
+ where nr = 2;
+ id | snapshot_contains
+ ----+-------------------
+ 11 | t
+ 12 | t
+ 13 | f
+ 14 | t
+ 15 | f
+ 16 | t
+ 17 | t
+ 18 | f
+ 19 | t
+ 20 | f
+ 21 | f
+ (11 rows)
+
+ -- test current values also
+ select current_txid() >= snapshot_xmin(current_snapshot());
+ ?column?
+ ----------
+ t
+ (1 row)
+
+ select current_txid() < snapshot_xmax(current_snapshot());
+ ?column?
+ ----------
+ t
+ (1 row)
+
+ select snapshot_contains(current_snapshot(), current_txid());
+ snapshot_contains
+ -------------------
+ t
+ (1 row)
+
+ -- pg_resync
+ select current_txid() >> 32;
+ ?column?
+ ----------
+ 0
+ (1 row)
+
+ select pg_sync_txid(400);
+ pg_sync_txid
+ --------------
+
+ (1 row)
+
+ select current_txid() >> 32;
+ ?column?
+ ----------
+ 0
+ (1 row)
+
+ select pg_sync_txid(8589934592);
+ pg_sync_txid
+ --------------
+
+ (1 row)
+
+ select current_txid() >> 32;
+ ?column?
+ ----------
+ 2
+ (1 row)
+
+ select pg_sync_txid(400);
+ pg_sync_txid
+ --------------
+
+ (1 row)
+
+ select current_txid() >> 32;
+ ?column?
+ ----------
+ 2
+ (1 row)
+
Index: pgsql/src/test/regress/sql/txid.sql
===================================================================
*** /dev/null
--- pgsql/src/test/regress/sql/txid.sql
***************
*** 0 ****
--- 1,55 ----
+
+ -- i/o
+ select '12:20:'::snapshot;
+ select '12:15:'::snapshot;
+
+ -- text conversion
+ select '12:13:'::text::snapshot;
+ select '12:13:'::snapshot::text;
+
+ -- errors
+ select '31:12:'::snapshot;
+ select '0:1:'::snapshot;
+ select '12:13:0'::snapshot;
+ select '12:20:14,13'::snapshot;
+
+ -- info
+ select snapshot_xmin('1:2:'::snapshot);
+ select snapshot_xmax('1:2:'::snapshot);
+ select * from snapshot_active_list('1:20:3,4,5,6,7'::snapshot);
+
+ -- storage
+ create table snapshot_test (
+ nr integer,
+ snap snapshot
+ );
+
+ insert into snapshot_test values (1, '12:13:');
+ -- small delta
+ insert into snapshot_test values (2, '12:20:13,15,18');
+ -- large delta
+ insert into snapshot_test values (3,
'2000000:7000000:3000001,4000002,5000003,6000004');
+
+ select snap,
+ snapshot_xmin(snap),
+ snapshot_xmax(snap)
+ from snapshot_test order by nr;
+
+ select id, snapshot_contains(snap, id)
+ from snapshot_test, generate_series(11, 21) id
+ where nr = 2;
+
+ -- test current values also
+ select current_txid() >= snapshot_xmin(current_snapshot());
+ select current_txid() < snapshot_xmax(current_snapshot());
+ select snapshot_contains(current_snapshot(), current_txid());
+
+ -- pg_resync
+ select current_txid() >> 32;
+ select pg_sync_txid(400);
+ select current_txid() >> 32;
+ select pg_sync_txid(8589934592);
+ select current_txid() >> 32;
+ select pg_sync_txid(400);
+ select current_txid() >> 32;
+
Index: pgsql/src/test/regress/output/misc.source
===================================================================
*** pgsql.orig/src/test/regress/output/misc.source
--- pgsql/src/test/regress/output/misc.source
*************** SELECT user_relns() AS user_relns
*** 650,655 ****
--- 650,656 ----
road
shighway
slow_emp4000
+ snapshot_test
street
stud_emp
student
*************** SELECT user_relns() AS user_relns
*** 665,671 ****
toyemp
varchar_tbl
xacttest
! (99 rows)
SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer')));
name
--- 666,672 ----
toyemp
varchar_tbl
xacttest
! (100 rows)
SELECT name(equipment(hobby_construct(text 'skywalking', text 'mer')));
name
---------------------------(end of broadcast)---------------------------
TIP 2: Don't 'kill -9' the postmaster