On 2015-02-16 01:21:55 +0100, Andres Freund wrote: > Here's my next attept attempt at producing something we can agree > upon. > > The major change that might achieve that is that I've now provided a > separate method to store the origin_id of a node. I've made it > conditional on !REPLICATION_IDENTIFIER_REUSE_PADDING, to show both > paths. That new method uses Heikki's xlog rework to dynamically add the > origin to the record if a origin is set up. That works surprisingly > simply. > > Other changes: > > * Locking preventing several backends to replay changes at the same > time. This is actually overly restrictive in some cases, but I think > good enough for now. > * Logical decoding grew a filter_by_origin callback that allows to > ignore changes that were replayed on a remote system. Such filters are > executed before much is done with records, potentially saving a fair > bit of costs. > * Rebased. That took a bit due the xlog and other changes. > * A couple more SQL interface functions (like dropping a replication > identifier).
And here an actual patch. Greetings, Andres Freund -- Andres Freund http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
>From 268d52cac6bf7fe1c019fd68248853c7c7ae18b1 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Mon, 16 Feb 2015 01:22:08 +0100 Subject: [PATCH] Introduce replication identifiers to keep track of replication progress: v0.6 --- contrib/test_decoding/Makefile | 3 +- contrib/test_decoding/expected/replident.out | 84 ++ contrib/test_decoding/sql/replident.sql | 40 + contrib/test_decoding/test_decoding.c | 28 + src/backend/access/rmgrdesc/xactdesc.c | 17 +- src/backend/access/transam/xact.c | 64 +- src/backend/access/transam/xlog.c | 34 +- src/backend/access/transam/xloginsert.c | 22 +- src/backend/access/transam/xlogreader.c | 10 + src/backend/bootstrap/bootstrap.c | 5 +- src/backend/catalog/Makefile | 2 +- src/backend/catalog/catalog.c | 8 +- src/backend/catalog/system_views.sql | 7 + src/backend/replication/logical/Makefile | 3 +- src/backend/replication/logical/decode.c | 63 +- src/backend/replication/logical/logical.c | 5 + src/backend/replication/logical/reorderbuffer.c | 5 +- .../replication/logical/replication_identifier.c | 1190 ++++++++++++++++++++ src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/cache/syscache.c | 23 + src/backend/utils/misc/guc.c | 1 + src/bin/initdb/initdb.c | 1 + src/bin/pg_resetxlog/pg_resetxlog.c | 3 + src/include/access/xact.h | 10 +- src/include/access/xlog.h | 1 + src/include/access/xlogdefs.h | 6 + src/include/access/xlogreader.h | 9 + src/include/access/xlogrecord.h | 5 +- src/include/catalog/indexing.h | 6 + src/include/catalog/pg_proc.h | 28 + src/include/catalog/pg_replication_identifier.h | 75 ++ src/include/pg_config_manual.h | 6 + src/include/replication/output_plugin.h | 8 + src/include/replication/reorderbuffer.h | 8 +- src/include/replication/replication_identifier.h | 58 + src/include/utils/syscache.h | 2 + src/test/regress/expected/rules.out | 5 + src/test/regress/expected/sanity_check.out | 1 + 38 files changed, 1816 insertions(+), 33 deletions(-) create mode 100644 contrib/test_decoding/expected/replident.out create mode 100644 contrib/test_decoding/sql/replident.sql create mode 100644 src/backend/replication/logical/replication_identifier.c create mode 100644 src/include/catalog/pg_replication_identifier.h create mode 100644 src/include/replication/replication_identifier.h diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 438be44..f8334cc 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,8 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \ + binary prepared replident regresscheck: all | submake-regress submake-test_decoding $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/replident.out b/contrib/test_decoding/expected/replident.out new file mode 100644 index 0000000..1c508a5 --- /dev/null +++ b/contrib/test_decoding/expected/replident.out @@ -0,0 +1,84 @@ +-- predictability +SET synchronous_commit = on; +CREATE TABLE origin_tbl(id serial primary key, data text); +CREATE TABLE target_tbl(id serial primary key, data text); +SELECT pg_replication_identifier_create('test_decoding: regression_slot'); + pg_replication_identifier_create +---------------------------------- + 1 +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- origin tx +INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +-- as is normal, the insert into target_tbl shows up +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN' + table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again''' + table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT' + COMMIT +(5 rows) + +INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); +-- mark session as replaying +SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot'); + pg_replication_identifier_setup_replaying_from +------------------------------------------------ + +(1 row) + +BEGIN; +-- setup transaction origins +SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00'); + pg_replication_identifier_setup_tx_origin +------------------------------------------- + +(1 row) + +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); +COMMIT; +SELECT pg_replication_identifier_reset_replaying_from(); + pg_replication_identifier_reset_replaying_from +------------------------------------------------ + +(1 row) + +-- and magically the replayed xact will be filtered! +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + data +------ +(0 rows) + +--but new original changes still show up +INSERT INTO origin_tbl(data) VALUES ('will be replicated'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + data +-------------------------------------------------------------------------------- + BEGIN + table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated' + COMMIT +(3 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_replication_identifier_drop('test_decoding: regression_slot'); + pg_replication_identifier_drop +-------------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/replident.sql b/contrib/test_decoding/sql/replident.sql new file mode 100644 index 0000000..f01836f --- /dev/null +++ b/contrib/test_decoding/sql/replident.sql @@ -0,0 +1,40 @@ +-- predictability +SET synchronous_commit = on; + +CREATE TABLE origin_tbl(id serial primary key, data text); +CREATE TABLE target_tbl(id serial primary key, data text); + +SELECT pg_replication_identifier_create('test_decoding: regression_slot'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- origin tx +INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- as is normal, the insert into target_tbl shows up +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); + +-- mark session as replaying +SELECT pg_replication_identifier_setup_replaying_from('test_decoding: regression_slot'); + +BEGIN; +-- setup transaction origins +SELECT pg_replication_identifier_setup_tx_origin('0/ffffffff', '2013-01-01 00:00'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); +COMMIT; + +SELECT pg_replication_identifier_reset_replaying_from(); + +-- and magically the replayed xact will be filtered! +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + +--but new original changes still show up +INSERT INTO origin_tbl(data) VALUES ('will be replicated'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_replication_identifier_drop('test_decoding: regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 963d5df..2ec3001 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/replication_identifier.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -43,6 +44,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool xact_wrote_changes; + bool only_local; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static bool pg_decode_filter(LogicalDecodingContext *ctx, + RepNodeId origin_id); void _PG_init(void) @@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; } @@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_xids = true; data->include_timestamp = false; data->skip_empty_xacts = false; + data->only_local = false; ctx->output_plugin_private = data; @@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "only-local") == 0) + { + + if (elem->arg == NULL) + data->only_local = true; + else if (!parse_bool(strVal(elem->arg), &data->only_local)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static bool +pg_decode_filter(LogicalDecodingContext *ctx, + RepNodeId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepNodeId) + return true; + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 3e87978..0ec6b0f 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -25,9 +25,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) { int i; TransactionId *subxacts; + SharedInvalidationMessage *msgs; subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels]; + msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; + appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time)); if (xlrec->nrels > 0) @@ -49,9 +52,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) } if (xlrec->nmsgs > 0) { - SharedInvalidationMessage *msgs; - - msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts]; if (XactCompletionRelcacheInitFileInval(xlrec->xinfo)) appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u", @@ -80,6 +80,17 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec) appendStringInfo(buf, " unknown id %d", msg->id); } } + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + { + xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]); + + appendStringInfo(buf, " origin %u, lsn %X/%X, at %s", + origin->origin_node_id, + (uint32)(origin->origin_lsn >> 32), + (uint32)origin->origin_lsn, + timestamptz_to_str(origin->origin_timestamp)); + } + } static void diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 97000ef..579f9cc 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -40,8 +40,10 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/walsender.h" #include "replication/syncrep.h" +#include "replication/replication_identifier.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -1080,9 +1082,10 @@ RecordTransactionCommit(void) * gracefully. Till then, it's just 20 bytes of overhead. */ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit || - XLogLogicalInfoActive()) + XLogLogicalInfoActive() || replication_origin_id != InvalidRepNodeId) { xl_xact_commit xlrec; + xl_xact_origin origin; /* * Set flags required for recovery processing of commits. @@ -1115,6 +1118,19 @@ RecordTransactionCommit(void) if (nmsgs > 0) XLogRegisterData((char *) invalMessages, nmsgs * sizeof(SharedInvalidationMessage)); + /* dump transaction origin information */ + if (replication_origin_id != InvalidRepNodeId) + { + xlrec.xinfo |= XACT_CONTAINS_ORIGIN; + + origin.origin_node_id = replication_origin_id; + origin.origin_lsn = replication_origin_lsn; + origin.origin_timestamp = replication_origin_timestamp; + + XLogRegisterData((char *) &origin, + sizeof(xl_xact_origin)); + + } (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT); } else @@ -1135,6 +1151,13 @@ RecordTransactionCommit(void) } } + /* record plain commit ts if not replaying remote actions */ + if (replication_origin_id == InvalidRepNodeId || + replication_origin_id == DoNotReplicateRepNodeId) + replication_origin_timestamp = xactStopTimestamp; + else + AdvanceCachedReplicationIdentifier(replication_origin_lsn, XactLastRecEnd); + /* * We only need to log the commit timestamp separately if the node * identifier is a valid value; the commit record above already contains @@ -1146,7 +1169,7 @@ RecordTransactionCommit(void) node_id = CommitTsGetDefaultNodeId(); TransactionTreeSetCommitTsData(xid, nchildren, children, - xactStopTimestamp, + replication_origin_timestamp, node_id, node_id != InvalidCommitTsNodeId); } @@ -1230,9 +1253,11 @@ RecordTransactionCommit(void) if (wrote_xlog) SyncRepWaitForLSN(XactLastRecEnd); + /* remember end of last commit record */ + XactLastCommitEnd = XactLastRecEnd; + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; - cleanup: /* Clean up local data */ if (rels) @@ -4665,10 +4690,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, SharedInvalidationMessage *inval_msgs, int nmsgs, RelFileNode *xnodes, int nrels, Oid dbId, Oid tsId, - uint32 xinfo) + uint32 xinfo, + xl_xact_origin *origin) { TransactionId max_xid; int i; + RepNodeId origin_node_id = InvalidRepNodeId; max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); @@ -4688,9 +4715,18 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, LWLockRelease(XidGenLock); } + Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL)); + + if (xinfo & XACT_CONTAINS_ORIGIN) + { + origin_node_id = origin->origin_node_id; + commit_time = origin->origin_timestamp; + } + /* Set the transaction commit timestamp and metadata */ TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids, - commit_time, InvalidCommitTsNodeId, false); + commit_time, origin_node_id, false); + if (standbyState == STANDBY_DISABLED) { @@ -4747,6 +4783,14 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, StandbyReleaseLockTree(xid, 0, NULL); } + if (xinfo & XACT_CONTAINS_ORIGIN) + { + /* recover apply progress */ + AdvanceReplicationIdentifier(origin_node_id, + origin->origin_lsn, + lsn); + } + /* Make sure files supposed to be dropped are dropped */ if (nrels > 0) { @@ -4805,19 +4849,24 @@ xact_redo_commit(xl_xact_commit *xlrec, { TransactionId *subxacts; SharedInvalidationMessage *inval_msgs; + xl_xact_origin *origin = NULL; /* subxid array follows relfilenodes */ subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); /* invalidation messages array follows subxids */ inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]); + xact_redo_commit_internal(xid, lsn, xlrec->xact_time, subxacts, xlrec->nsubxacts, inval_msgs, xlrec->nmsgs, xlrec->xnodes, xlrec->nrels, xlrec->dbId, xlrec->tsId, - xlrec->xinfo); + xlrec->xinfo, + origin); } /* @@ -4833,7 +4882,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec, NULL, 0, /* relfilenodes */ InvalidOid, /* dbId */ InvalidOid, /* tsId */ - 0); /* xinfo */ + 0, /* xinfo */ + NULL /* origin */); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 629a457..5eb0ef5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -44,6 +44,7 @@ #include "postmaster/startup.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/replication_identifier.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" #include "replication/walsender.h" @@ -297,6 +298,8 @@ static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; +XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; + /* * RedoRecPtr is this backend's local copy of the REDO record pointer * (which is almost but not quite the same as a pointer to the most recent @@ -6014,6 +6017,11 @@ StartupXLOG(void) StartupMultiXact(); /* + * Recover knowledge about replay progress of known replication partners. + */ + StartupReplicationIdentifier(checkPoint.redo); + + /* * Initialize unlogged LSN. On a clean shutdown, it's restored from the * control file. On recovery, all unlogged relations are blown away, so * the unlogged LSN counter can be reset too. @@ -7645,6 +7653,7 @@ CreateCheckPoint(int flags) XLogRecPtr recptr; XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace; + XLogRecPtr oldRedoPtr; XLogSegNo _logSegNo; XLogRecPtr curInsert; VirtualTransactionId *vxids; @@ -7960,10 +7969,10 @@ CreateCheckPoint(int flags) (errmsg("concurrent transaction log activity while database system is shutting down"))); /* - * Select point at which we can truncate the log, which we base on the - * prior checkpoint's earliest info. + * Select point at which we can truncate the log (and other resources + * related to it), which we base on the prior checkpoint's earliest info. */ - XLByteToSeg(ControlFile->checkPointCopy.redo, _logSegNo); + oldRedoPtr = ControlFile->checkPointCopy.redo; /* * Update the control file. @@ -8018,6 +8027,7 @@ CreateCheckPoint(int flags) * Delete old log files (those no longer needed even for previous * checkpoint or the standbys in XLOG streaming). */ + XLByteToSeg(oldRedoPtr, _logSegNo); if (_logSegNo) { KeepLogSeg(recptr, &_logSegNo); @@ -8047,6 +8057,13 @@ CreateCheckPoint(int flags) */ TruncateMultiXact(); + /* + * Remove old replication identifier checkpoints. We're using the previous + * checkpoint's redo ptr as a cutoff - even if we were to use that + * checkpoint to startup we're not going to need anything older. + */ + TruncateReplicationIdentifier(oldRedoPtr); + /* Real work is done, but log and update stats before releasing lock. */ LogCheckpointEnd(false); @@ -8130,6 +8147,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointSnapBuild(); CheckPointLogicalRewriteHeap(); CheckPointBuffers(flags); /* performs all required fsyncs */ + CheckPointReplicationIdentifier(checkPointRedo); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); } @@ -8190,6 +8208,7 @@ CreateRestartPoint(int flags) { XLogRecPtr lastCheckPointRecPtr; CheckPoint lastCheckPoint; + XLogRecPtr oldRedoPtr; XLogSegNo _logSegNo; TimestampTz xtime; @@ -8289,7 +8308,7 @@ CreateRestartPoint(int flags) * Select point at which we can truncate the xlog, which we base on the * prior checkpoint's earliest info. */ - XLByteToSeg(ControlFile->checkPointCopy.redo, _logSegNo); + oldRedoPtr = ControlFile->checkPointCopy.redo; /* * Update pg_control, using current time. Check that it still shows @@ -8316,6 +8335,7 @@ CreateRestartPoint(int flags) * checkpoint/restartpoint) to prevent the disk holding the xlog from * growing full. */ + XLByteToSeg(oldRedoPtr, _logSegNo); if (_logSegNo) { XLogRecPtr receivePtr; @@ -8385,6 +8405,12 @@ CreateRestartPoint(int flags) TruncateMultiXact(); /* + * Also truncate replication identifiers. c.f. CreateCheckPoint()'s + * comment. + */ + TruncateReplicationIdentifier(oldRedoPtr); + + /* * Truncate pg_subtrans if possible. We can throw away all data before * the oldest XMIN of any running transaction. No future transaction will * attempt to reference any pg_subtrans entry older than that (see Asserts diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index a1e2eb8..a91298b 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -25,6 +25,7 @@ #include "access/xloginsert.h" #include "catalog/pg_control.h" #include "miscadmin.h" +#include "replication/replication_identifier.h" #include "storage/bufmgr.h" #include "storage/proc.h" #include "utils/memutils.h" @@ -76,10 +77,16 @@ static uint32 mainrdata_len; /* total # of bytes in chain */ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; +#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING +#define SizeOfXlogOrigin 0 +#else +#define SizeOfXlogOrigin (sizeof(RepNodeId) + sizeof(XLR_BLOCK_ID_ORIGIN)) +#endif + #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) /* * An array of XLogRecData structs, to hold registered data. @@ -629,6 +636,16 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(BlockNumber); } +#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING + /* followed by the record's origin, if any */ + if (replication_origin_id != InvalidRepNodeId) + { + *(scratch++) = XLR_BLOCK_ID_ORIGIN; + memcpy(scratch, &replication_origin_id, sizeof(replication_origin_id)); + scratch += sizeof(replication_origin_id); + } +#endif + /* followed by main data, if any */ if (mainrdata_len > 0) { @@ -674,6 +691,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, rechdr->xl_tot_len = total_len; rechdr->xl_info = info; rechdr->xl_rmid = rmid; +#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING + rechdr->xl_origin_id = replication_origin_id; +#endif rechdr->xl_prev = InvalidXLogRecPtr; rechdr->xl_crc = rdata_crc; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 60470b5..f8233a0 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -20,6 +20,7 @@ #include "access/xlog_internal.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" +#include "replication/replication_identifier.h" static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); @@ -956,6 +957,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ResetDecoder(state); state->decoded_record = record; +#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING + state->record_origin = InvalidRepNodeId; +#endif ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -990,6 +994,12 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) break; /* by convention, the main data fragment is * always last */ } +#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING + else if (block_id == XLR_BLOCK_ID_ORIGIN) + { + COPY_HEADER_FIELD(&state->record_origin, sizeof(RepNodeId)); + } +#endif else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index bc66eac..e2de408 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -705,10 +705,13 @@ DefineAttr(char *name, char *type, int attnum) * oidvector and int2vector are also treated as not-nullable, even though * they are no longer fixed-width. */ + /* FIXME!!!! */ #define MARKNOTNULL(att) \ ((att)->attlen > 0 || \ (att)->atttypid == OIDVECTOROID || \ - (att)->atttypid == INT2VECTOROID) + (att)->atttypid == INT2VECTOROID || \ + strcmp(NameStr((att)->attname), "riname") == 0 \ + ) if (MARKNOTNULL(attrtypes[attnum])) { diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index a403c64..5b04550 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \ pg_ts_parser.h pg_ts_template.h pg_extension.h \ pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \ - pg_foreign_table.h pg_policy.h \ + pg_foreign_table.h pg_policy.h pg_replication_identifier.h \ pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \ toasting.h indexing.h \ ) diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index 8e7a9ec..318d65a 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -32,6 +32,7 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_pltemplate.h" #include "catalog/pg_db_role_setting.h" +#include "catalog/pg_replication_identifier.h" #include "catalog/pg_shdepend.h" #include "catalog/pg_shdescription.h" #include "catalog/pg_shseclabel.h" @@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId) relationId == SharedDependRelationId || relationId == SharedSecLabelRelationId || relationId == TableSpaceRelationId || - relationId == DbRoleSettingRelationId) + relationId == DbRoleSettingRelationId || + relationId == ReplicationIdentifierRelationId) return true; /* These are their indexes (see indexing.h) */ if (relationId == AuthIdRolnameIndexId || @@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId) relationId == SharedSecLabelObjectIndexId || relationId == TablespaceOidIndexId || relationId == TablespaceNameIndexId || - relationId == DbRoleSettingDatidRolidIndexId) + relationId == DbRoleSettingDatidRolidIndexId || + relationId == ReplicationLocalIdentIndex || + relationId == ReplicationExternalIdentIndex) return true; /* These are their toast tables and toast indexes (see toasting.h) */ if (relationId == PgShdescriptionToastTable || diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5e69e2b..4559f99 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -769,6 +769,13 @@ CREATE VIEW pg_user_mappings AS REVOKE ALL on pg_user_mapping FROM public; + +CREATE VIEW pg_replication_identifier_progress AS + SELECT * + FROM pg_get_replication_identifier_progress(); + +REVOKE ALL ON pg_replication_identifier_progress FROM public; + -- -- We have a few function definitions in here, too. -- At some point there might be enough to justify breaking them out into diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 310a45c..95bcffb 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) -OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o +OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \ + snapbuild.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 77c02ba..f8f7016 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -40,6 +40,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/replication_identifier.h" #include "replication/snapbuild.h" #include "storage/standby.h" @@ -67,7 +68,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, TransactionId xid, Oid dboid, TimestampTz commit_time, int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msg); + int ninval_msgs, SharedInvalidationMessage *msg, + xl_xact_origin *origin); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, TransactionId *sub_xids, int nsubxacts); @@ -201,16 +203,20 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; TransactionId *subxacts = NULL; SharedInvalidationMessage *invals = NULL; + xl_xact_origin *origin = NULL; xlrec = (xl_xact_commit *) XLogRecGetData(r); subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + origin = (xl_xact_origin *) &(invals[xlrec->nmsgs]); + DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId, xlrec->xact_time, xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); + xlrec->nmsgs, invals, origin); break; } @@ -220,6 +226,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; TransactionId *subxacts; SharedInvalidationMessage *invals = NULL; + xl_xact_origin *origin = NULL; /* Prepared commits contain a normal commit record... */ prec = (xl_xact_commit_prepared *) XLogRecGetData(r); @@ -228,10 +235,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); + if (xlrec->xinfo & XACT_CONTAINS_ORIGIN) + origin = (xl_xact_origin *) &(invals[xlrec->nmsgs]); + DecodeCommit(ctx, buf, prec->xid, xlrec->dbId, xlrec->xact_time, xlrec->nsubxacts, subxacts, - xlrec->nmsgs, invals); + xlrec->nmsgs, invals, origin); break; } @@ -244,7 +254,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid, xlrec->xact_time, xlrec->nsubxacts, xlrec->subxacts, - 0, NULL); + 0, NULL, NULL); break; } case XLOG_XACT_ABORT: @@ -480,10 +490,19 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, TransactionId xid, Oid dboid, TimestampTz commit_time, int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msgs) + int ninval_msgs, SharedInvalidationMessage *msgs, + xl_xact_origin *origin) { + RepNodeId origin_id = InvalidRepNodeId; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; int i; + if (origin != NULL) + { + origin_id = origin->origin_node_id; + origin_lsn = origin->origin_lsn; + } + /* * Process invalidation messages, even if we're not interested in the * transaction's contents, since the various caches need to always be @@ -504,12 +523,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There basically two reasons we might not be interested in this + * There can be several reasons we might not be interested in this * transaction: * 1) We might not be interested in decoding transactions up to this * LSN. This can happen because we previously decoded it and now just * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -524,7 +544,9 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * --- */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (dboid != InvalidOid && dboid != ctx->slot->data.database)) + (dboid != InvalidOid && dboid != ctx->slot->data.database) || + (ctx->callbacks.filter_by_origin_cb && + ctx->callbacks.filter_by_origin_cb(ctx, origin_id))) { for (i = 0; i < nsubxacts; i++) { @@ -546,7 +568,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time); + commit_time, origin_id, origin_lsn); } /* @@ -590,8 +612,14 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (ctx->callbacks.filter_by_origin_cb && + ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -632,8 +660,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (ctx->callbacks.filter_by_origin_cb && + ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -681,8 +715,14 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (ctx->callbacks.filter_by_origin_cb && + ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); @@ -726,6 +766,11 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (rnode.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (ctx->callbacks.filter_by_origin_cb && + ctx->callbacks.filter_by_origin_cb(ctx, XLogRecGetOrigin(r))) + return; + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); data = tupledata; @@ -738,6 +783,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 30baa45..638a663 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -39,6 +39,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/replication_identifier.h" #include "replication/snapbuild.h" #include "storage/proc.h" @@ -46,6 +47,10 @@ #include "utils/memutils.h" +RepNodeId replication_origin_id = InvalidRepNodeId; /* assumed identity */ +XLogRecPtr replication_origin_lsn; +TimestampTz replication_origin_timestamp; + /* data for errcontext callback */ typedef struct LogicalErrorCallbackState { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index bcd5896..30086c9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time) + TimestampTz commit_time, + RepNodeId origin_id, XLogRecPtr origin_lsn) { ReorderBufferTXN *txn; volatile Snapshot snapshot_now; @@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; /* serialize the last bunch of changes if we need start earlier anyway */ if (txn->nentries_mem != txn->nentries) diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c new file mode 100644 index 0000000..1364cea --- /dev/null +++ b/src/backend/replication/logical/replication_identifier.c @@ -0,0 +1,1190 @@ +/*------------------------------------------------------------------------- + * + * replication_identifier.c + * Logical Replication Node Identifier and replication progress persistency + * support. + * + * Copyright (c) 2013-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/replication_identifier.c + * + */ + +#include "postgres.h" + +#include <unistd.h> +#include <sys/stat.h> + +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" + +#include "catalog/indexing.h" + +#include "nodes/execnodes.h" + +#include "replication/replication_identifier.h" +#include "replication/logical.h" + +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/copydir.h" + +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/pg_lsn.h" +#include "utils/rel.h" +#include "utils/syscache.h" +#include "utils/tqual.h" + +/* + * Replay progress of a single remote node. + */ +typedef struct ReplicationState +{ + /* + * Local identifier for the remote node. + */ + RepNodeId local_identifier; + + /* + * Location of the latest commit from the remote side. + */ + XLogRecPtr remote_lsn; + + /* + * Remember the local lsn of the commit record so we can XLogFlush() to it + * during a checkpoint so we know the commit record actually is safe on + * disk. + */ + XLogRecPtr local_lsn; + + /* + * Slot is setup in backend? + */ + pid_t acquired_by; +} ReplicationState; + +/* + * On disk version of ReplicationState. + */ +typedef struct ReplicationStateOnDisk +{ + RepNodeId local_identifier; + XLogRecPtr remote_lsn; +} ReplicationStateOnDisk; + + +/* + * Base address into a shared memory array of replication states of size + * max_replication_slots. + * + * XXX: Should we use a separate variable to size this rather than + * max_replication_slots? + */ +static ReplicationState *ReplicationStates; + +/* + * Backend-local, cached element from ReplicationStates for use in a backend + * replaying remote commits, so we don't have to search ReplicationStates for + * the backends current RepNodeId. + */ +static ReplicationState *local_replication_state = NULL; + +/* Magic for on disk files. */ +#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE + +/* XXX: move to c.h? */ +#ifndef UINT16_MAX +#define UINT16_MAX ((1<<16) - 1) +#else +#if UINT16_MAX != ((1<<16) - 1) +#error "uh, wrong UINT16_MAX?" +#endif +#endif + +/* + * Check for a persistent repication identifier identified by the replication + * identifier's external name.. + * + * Returns InvalidOid if the node isn't known yet. + */ +RepNodeId +GetReplicationIdentifier(char *riname, bool missing_ok) +{ + Form_pg_replication_identifier ident; + Oid riident = InvalidOid; + HeapTuple tuple; + Datum riname_d; + + riname_d = CStringGetTextDatum(riname); + + tuple = SearchSysCache1(REPLIDREMOTE, riname_d); + if (HeapTupleIsValid(tuple)) + { + ident = (Form_pg_replication_identifier) GETSTRUCT(tuple); + riident = ident->riident; + ReleaseSysCache(tuple); + } + else if (!missing_ok) + elog(ERROR, "cache lookup failed for replication identifier named %s", + riname); + + return riident; +} + +/* + * Create a persistent replication identifier. + * + * Needs to be called in a transaction. + */ +RepNodeId +CreateReplicationIdentifier(char *riname) +{ + Oid riident; + HeapTuple tuple = NULL; + Relation rel; + Datum riname_d; + SnapshotData SnapshotDirty; + SysScanDesc scan; + ScanKeyData key; + + riname_d = CStringGetTextDatum(riname); + + Assert(IsTransactionState()); + + /* + * We need the numeric replication identifiers to be 16bit wide, so we + * cannot rely on the normal oid allocation. So we simply scan + * pg_replication_identifier for the first unused id. That's not + * particularly efficient, but this should be an fairly infrequent + * operation - we can easily spend a bit more code on this when it turns + * out it needs to be faster. + * + * We handle concurrency by taking an exclusive lock (allowing reads!) + * over the table for the duration of the search. Because we use a "dirty + * snapshot" we can read rows that other in-progress sessions have + * written, even though they would be invisible with normal snapshots. Due + * to the exclusive lock there's no danger that new rows can appear while + * we're checking. + */ + InitDirtySnapshot(SnapshotDirty); + + rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock); + + for (riident = InvalidOid + 1; riident < UINT16_MAX; riident++) + { + bool nulls[Natts_pg_replication_identifier]; + Datum values[Natts_pg_replication_identifier]; + bool collides; + CHECK_FOR_INTERRUPTS(); + + ScanKeyInit(&key, + Anum_pg_replication_riident, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(riident)); + + scan = systable_beginscan(rel, ReplicationLocalIdentIndex, + true /* indexOK */, + &SnapshotDirty, + 1, &key); + + collides = HeapTupleIsValid(systable_getnext(scan)); + + systable_endscan(scan); + + if (!collides) + { + /* + * Ok, found an unused riident, insert the new row and do a CCI, + * so our callers can look it up if they want to. + */ + memset(&nulls, 0, sizeof(nulls)); + + values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident); + values[Anum_pg_replication_riname - 1] = riname_d; + + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + simple_heap_insert(rel, tuple); + CatalogUpdateIndexes(rel, tuple); + CommandCounterIncrement(); + break; + } + } + + /* now release lock again, */ + heap_close(rel, ExclusiveLock); + + if (tuple == NULL) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("no free replication id could be found"))); + + heap_freetuple(tuple); + return riident; +} + + +/* + * Create a persistent replication identifier. + * + * Needs to be called in a transaction. + */ +void +DropReplicationIdentifier(RepNodeId riident) +{ + HeapTuple tuple = NULL; + Relation rel; + SnapshotData SnapshotDirty; + SysScanDesc scan; + ScanKeyData key; + int i; + + Assert(IsTransactionState()); + + InitDirtySnapshot(SnapshotDirty); + + rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock); + + /* cleanup the slot state info */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state = &ReplicationStates[i]; + + /* found our slot */ + if (state->local_identifier == riident) + { + if (state->acquired_by != 0) + { + elog(ERROR, "cannot drop slot that is setup in backend %d", + state->acquired_by); + } + memset(state, 0, sizeof(ReplicationState)); + break; + } + } + + ScanKeyInit(&key, + Anum_pg_replication_riident, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(riident)); + + scan = systable_beginscan(rel, ReplicationLocalIdentIndex, + true /* indexOK */, + &SnapshotDirty, + 1, &key); + + tuple = systable_getnext(scan); + + if (HeapTupleIsValid(tuple)) + simple_heap_delete(rel, &tuple->t_self); + + systable_endscan(scan); + + CommandCounterIncrement(); + + /* now release lock again, */ + heap_close(rel, ExclusiveLock); +} + + +/* + * Lookup pg_replication_identifier tuple via its riident. + * + * The result needs to be ReleaseSysCache'ed and is an invalid HeapTuple if + * the lookup failed. + */ +void +GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, char **riname) +{ + HeapTuple tuple; + Form_pg_replication_identifier ric; + + Assert(OidIsValid((Oid) riident)); + Assert(riident != InvalidRepNodeId); + Assert(riident != DoNotReplicateRepNodeId); + + tuple = SearchSysCache1(REPLIDIDENT, + ObjectIdGetDatum((Oid) riident)); + + if (HeapTupleIsValid(tuple)) + { + ric = (Form_pg_replication_identifier) GETSTRUCT(tuple); + *riname = pstrdup(text_to_cstring(&ric->riname)); + } + + if (!HeapTupleIsValid(tuple) && !missing_ok) + elog(ERROR, "cache lookup failed for replication identifier id: %u", + riident); + + if (HeapTupleIsValid(tuple)) + ReleaseSysCache(tuple); +} + +static void +CheckReplicationIdentifierPrerequisites(bool check_slots) +{ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("only superusers can query or manipulate replication identifiers"))); + + if (check_slots && max_replication_slots == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot query or manipulate replication identifiers when max_replication_slots = 0"))); + +} + +Datum +pg_replication_identifier_get(PG_FUNCTION_ARGS) +{ + char *name; + RepNodeId riident; + + CheckReplicationIdentifierPrerequisites(false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + riident = GetReplicationIdentifier(name, true); + + pfree(name); + + if (OidIsValid(riident)) + PG_RETURN_OID(riident); + PG_RETURN_NULL(); +} + + +Datum +pg_replication_identifier_create(PG_FUNCTION_ARGS) +{ + char *name; + RepNodeId riident; + + CheckReplicationIdentifierPrerequisites(false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + riident = CreateReplicationIdentifier(name); + + pfree(name); + + PG_RETURN_OID(riident); +} + +Datum +pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS) +{ + char *name; + RepNodeId origin; + + CheckReplicationIdentifierPrerequisites(true); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + origin = GetReplicationIdentifier(name, false); + SetupCachedReplicationIdentifier(origin); + + replication_origin_id = origin; + + pfree(name); + + PG_RETURN_VOID(); +} + +Datum +pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS) +{ + CheckReplicationIdentifierPrerequisites(true); + + PG_RETURN_BOOL(replication_origin_id != InvalidRepNodeId); +} + +Datum +pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS) +{ + CheckReplicationIdentifierPrerequisites(true); + + TeardownCachedReplicationIdentifier(); + + replication_origin_id = InvalidRepNodeId; + + PG_RETURN_VOID(); +} + + +Datum +pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS) +{ + XLogRecPtr location = PG_GETARG_LSN(0); + + CheckReplicationIdentifierPrerequisites(true); + + if (local_replication_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("need to setup the origin id first"))); + + replication_origin_lsn = location; + replication_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); + + PG_RETURN_VOID(); +} + +Datum +pg_get_replication_identifier_progress(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int i; +#define REPLICATION_IDENTIFIER_PROGRESS_COLS 4 + + /* we we want to return 0 rows if slot is set to zero */ + CheckReplicationIdentifierPrerequisites(false); + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + if (tupdesc->natts != REPLICATION_IDENTIFIER_PROGRESS_COLS) + elog(ERROR, "wrong function definition"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + /* prevent slots from being concurrently dropped */ + LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); + + /* + * Iterate through all possible ReplicationStates, display if they are + * filled. Note that we do not take any locks, so slightly corrupted/out + * of date values are a possibility. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *state; + Datum values[REPLICATION_IDENTIFIER_PROGRESS_COLS]; + bool nulls[REPLICATION_IDENTIFIER_PROGRESS_COLS]; + char *riname; + + state = &ReplicationStates[i]; + + /* unused slot, nothing to display */ + if (state->local_identifier == InvalidRepNodeId) + continue; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[ 0] = ObjectIdGetDatum(state->local_identifier); + + GetReplicationInfoByIdentifier(state->local_identifier, true, &riname); + + /* + * We're not preventing the identifier to be dropped concurrently, so + * silently accept that it might be gone. + */ + if (!riname) + continue; + + values[ 1] = CStringGetTextDatum(riname); + + values[ 2] = LSNGetDatum(state->remote_lsn); + + values[ 3] = LSNGetDatum(state->local_lsn); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + tuplestore_donestoring(tupstore); + + UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); + +#undef REPLICATION_IDENTIFIER_PROGRESS_COLS + + return (Datum) 0; +} + +Datum +pg_replication_identifier_advance(PG_FUNCTION_ARGS) +{ + text *name = PG_GETARG_TEXT_P(0); + XLogRecPtr remote_commit = PG_GETARG_LSN(1); + XLogRecPtr local_commit = PG_GETARG_LSN(2); + RepNodeId node; + + CheckReplicationIdentifierPrerequisites(true); + + /* lock to prevent the replication identifier from vanishing */ + LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); + + node = GetReplicationIdentifier(text_to_cstring(name), false); + + AdvanceReplicationIdentifier(node, remote_commit, local_commit); + + UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); + + PG_RETURN_VOID(); +} + +Datum +pg_replication_identifier_drop(PG_FUNCTION_ARGS) +{ + char *name; + RepNodeId riident; + + CheckReplicationIdentifierPrerequisites(false); + + name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); + + riident = GetReplicationIdentifier(name, false); + Assert(OidIsValid(riident)); + + DropReplicationIdentifier(riident); + + pfree(name); + + PG_RETURN_VOID(); +} + +Size +ReplicationIdentifierShmemSize(void) +{ + Size size = 0; + + /* + * FIXME: max_replication_slots is the wrong thing to use here, here we keep + * the replay state of *remote* transactions. + */ + if (max_replication_slots == 0) + return size; + + size = add_size(size, + mul_size(max_replication_slots, sizeof(ReplicationState))); + return size; +} + +void +ReplicationIdentifierShmemInit(void) +{ + bool found; + + if (max_replication_slots == 0) + return; + + ReplicationStates = (ReplicationState *) + ShmemInitStruct("ReplicationIdentifierState", + ReplicationIdentifierShmemSize(), + &found); + + if (!found) + { + MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize()); + } +} + +/* --------------------------------------------------------------------------- + * Perform a checkpoint of replication identifier's progress with respect to + * the replayed remote_lsn. Make sure that all transactions we refer to in the + * checkpoint (local_lsn) are actually on-disk. This might not yet be the case + * if the transactions were originally committed asynchronously. + * + * We store checkpoints in the following format: + * +-------+------------------------+------------------+-----+--------+ + * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF + * +-------+------------------------+------------------+-----+--------+ + * + * So its just the magic, followed by the statically sized + * ReplicationStateOnDisk structs. Note that the maximum number of + * ReplicationStates is determined by max_replication_slots. + * + * FIXME: Add a CRC32 to the end. + * --------------------------------------------------------------------------- + */ +void +CheckPointReplicationIdentifier(XLogRecPtr ckpt) +{ + char tmppath[MAXPGPATH]; + char path[MAXPGPATH]; + int fd; + int tmpfd; + int i; + uint32 magic = REPLICATION_STATE_MAGIC; + pg_crc32 crc; + + if (max_replication_slots == 0) + return; + + INIT_CRC32C(crc); + + /* + * Write to a filename a LSN of the checkpoint's REDO pointer, so we can + * deal with the checkpoint failing after + * CheckPointReplicationIdentifier() finishing. + */ + sprintf(path, "pg_logical/checkpoints/%X-%X.ckpt", + (uint32)(ckpt >> 32), (uint32)ckpt); + sprintf(tmppath, "pg_logical/checkpoints/%X-%X.ckpt.tmp", + (uint32)(ckpt >> 32), (uint32)ckpt); + + /* check whether file already exists */ + fd = OpenTransientFile(path, + O_RDONLY | PG_BINARY, + 0); + + /* usual case, no checkpoint performed yet */ + if (fd < 0 && errno == ENOENT) + ; + else if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not check replication state checkpoint \"%s\": %m", + path))); + /* already checkpointed before crash during a checkpoint or so */ + else + { + CloseTransientFile(fd); + return; + } + + /* make sure no old temp file is remaining */ + if (unlink(tmppath) < 0 && errno != ENOENT) + ereport(PANIC, (errmsg("failed while unlinking %s", path))); + + /* + * no other backend can perform this at the same time, we're protected by + * CheckpointLock. + */ + tmpfd = OpenTransientFile(tmppath, + O_CREAT | O_EXCL | O_WRONLY | PG_BINARY, + S_IRUSR | S_IWUSR); + if (tmpfd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not create replication identifier checkpoint \"%s\": %m", + tmppath))); + + /* write magic */ + if ((write(tmpfd, &magic, sizeof(magic))) != + sizeof(magic)) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write replication identifier checkpoint \"%s\": %m", + tmppath))); + } + COMP_CRC32C(crc, &magic, sizeof(magic)); + + /* write actual data */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationStateOnDisk disk_state; + + /* XXX: Locking */ + + if (ReplicationStates[i].local_identifier == InvalidRepNodeId) + continue; + + disk_state.local_identifier = ReplicationStates[i].local_identifier; + disk_state.remote_lsn = ReplicationStates[i].remote_lsn; + + /* make sure we only write out a commit that's persistent */ + XLogFlush(ReplicationStates[i].local_lsn); + + if ((write(tmpfd, &disk_state, sizeof(disk_state))) != + sizeof(disk_state)) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write replication identifier checkpoint \"%s\": %m", + tmppath))); + } + + COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); + } + + /* write out the CRC */ + FIN_CRC32C(crc); + if ((write(tmpfd, &crc, sizeof(crc))) != + sizeof(crc)) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write replication identifier checkpoint \"%s\": %m", + tmppath))); + } + + /* fsync the file */ + if (pg_fsync(tmpfd) != 0) + { + CloseTransientFile(tmpfd); + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not fsync replication identifier checkpoint \"%s\": %m", + tmppath))); + } + + CloseTransientFile(tmpfd); + + /* rename to permanent file, fsync file and directory */ + if (rename(tmppath, path) != 0) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m", + tmppath, path))); + } + + fsync_fname("pg_logical/checkpoints", true); + fsync_fname(path, false); +} + +/* + * Remove old replication identifier checkpoints that cannot possibly be + * needed anymore for crash recovery. + */ +void +TruncateReplicationIdentifier(XLogRecPtr cutoff) +{ + DIR *snap_dir; + struct dirent *snap_de; + char path[MAXPGPATH]; + + snap_dir = AllocateDir("pg_logical/checkpoints"); + while ((snap_de = ReadDir(snap_dir, "pg_logical/checkpoints")) != NULL) + { + uint32 hi; + uint32 lo; + XLogRecPtr lsn; + struct stat statbuf; + + if (strcmp(snap_de->d_name, ".") == 0 || + strcmp(snap_de->d_name, "..") == 0) + continue; + + snprintf(path, MAXPGPATH, "pg_logical/checkpoints/%s", snap_de->d_name); + + if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode)) + { + elog(DEBUG1, "only regular files expected: %s", path); + continue; + } + + if (sscanf(snap_de->d_name, "%X-%X.ckpt", &hi, &lo) != 2) + { + ereport(LOG, + (errmsg("could not parse filename \"%s\"", path))); + continue; + } + + lsn = ((uint64) hi) << 32 | lo; + + /* check whether we still need it */ + if (lsn < cutoff) + { + elog(DEBUG2, "removing replication identifier checkpoint %s", path); + + /* + * It's not particularly harmful, though strange, if we can't + * remove the file here. Don't prevent the checkpoint from + * completing, that'd be cure worse than the disease. + */ + if (unlink(path) < 0) + { + ereport(LOG, + (errcode_for_file_access(), + errmsg("could not unlink file \"%s\": %m", + path))); + continue; + } + } + else + { + elog(DEBUG2, "keeping replication identifier checkpoint %s", path); + } + } + FreeDir(snap_dir); +} + +/* + * Recover replication replay status from checkpoint data saved earlier by + * CheckPointReplicationIdentifier. + * + * This only needs to be called at startup and *not* during every checkpoint + * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All + * state thereafter can be recovered by looking at commit records. + */ +void +StartupReplicationIdentifier(XLogRecPtr ckpt) +{ + char path[MAXPGPATH]; + int fd; + int readBytes; + uint32 magic = REPLICATION_STATE_MAGIC; + int last_state = 0; + pg_crc32 file_crc; + pg_crc32 crc; + + /* don't want to overwrite already existing state */ +#ifdef USE_ASSERT_CHECKING + static bool already_started = false; + Assert(!already_started); + already_started = true; +#endif + + if (max_replication_slots == 0) + return; + + INIT_CRC32C(crc); + + elog(LOG, "starting up replication identifier with ckpt at %X/%X", + (uint32)(ckpt >> 32), (uint32)ckpt); + + sprintf(path, "pg_logical/checkpoints/%X-%X.ckpt", + (uint32)(ckpt >> 32), (uint32)ckpt); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0); + + /* + * might have had max_replication_slots == 0 last run, or we just brought up a + * standby. + */ + if (fd < 0 && errno == ENOENT) + return; + else if (fd < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not open replication state checkpoint \"%s\": %m", + path))); + + /* verify magic, thats written even if nothing was active */ + readBytes = read(fd, &magic, sizeof(magic)); + if (readBytes != sizeof(magic)) + ereport(PANIC, + (errmsg("could not read replication state checkpoint magic \"%s\": %m", + path))); + COMP_CRC32C(crc, &magic, sizeof(magic)); + + if (magic != REPLICATION_STATE_MAGIC) + ereport(PANIC, + (errmsg("replication checkpoint has wrong magic %u instead of %u", + magic, REPLICATION_STATE_MAGIC))); + + /* recover individual states, until there are no more to be found */ + while (true) + { + ReplicationStateOnDisk disk_state; + + readBytes = read(fd, &disk_state, sizeof(disk_state)); + + /* no further data */ + if (readBytes == sizeof(crc)) + { + /* not pretty, but simple ... */ + file_crc = *(pg_crc32*) &disk_state; + break; + } + + if (readBytes < 0) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read replication checkpoint file \"%s\": %m", + path))); + } + + if (readBytes != sizeof(disk_state)) + { + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not read replication checkpoint file \"%s\": read %d of %zu", + path, readBytes, sizeof(disk_state)))); + } + + COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); + + if (last_state == max_replication_slots) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found, increase max_replication_slots"))); + + /* copy data to shared memory */ + ReplicationStates[last_state].local_identifier = disk_state.local_identifier; + ReplicationStates[last_state].remote_lsn = disk_state.remote_lsn; + last_state++; + + elog(LOG, "recovered replication state of node %u to %X/%X", + disk_state.local_identifier, + (uint32)(disk_state.remote_lsn >> 32), + (uint32)disk_state.remote_lsn); + } + + /* now check checksum */ + FIN_CRC32C(crc); + if (file_crc != crc) + ereport(PANIC, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u", + crc, file_crc))); + + CloseTransientFile(fd); +} + +/* + * Tell the replication identifier machinery that a commit from 'node' that + * originated at the LSN remote_commit on the remote node was replayed + * successfully and that we don't need to do so again. In combination with + * setting up replication_origin_lsn and replication_origin_id that ensures we + * won't loose knowledge about that after a crash if the the transaction had a + * persistent effect (think of asynchronous commits). + * + * local_commit needs to be a local LSN of the commit so that we can make sure + * uppon a checkpoint that enough WAL has been persisted to disk. + * + * Needs to be called with a RowExclusiveLock on pg_replication_identifier, + * unless running in recovery. + */ +void +AdvanceReplicationIdentifier(RepNodeId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit) +{ + int i; + int free_slot = -1; + ReplicationState *replication_state = NULL; + + Assert(node != InvalidRepNodeId); + + /* we don't track DoNotReplicateRepNodeId */ + if (node == DoNotReplicateRepNodeId) + return; + + /* + * XXX: should we restore into a hashtable and dump into shmem only after + * recovery finished? + */ + + /* check whether slot already exists */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *curstate = &ReplicationStates[i]; + + /* remember where to insert if necessary */ + if (curstate->local_identifier == InvalidRepNodeId && + free_slot == -1) + { + free_slot = i; + continue; + } + + /* not our slot */ + if (curstate->local_identifier != node) + continue; + + if (curstate->acquired_by != 0) + { + elog(ERROR, "cannot advance slot that is setup in backend %d", + curstate->acquired_by); + } + + /* ok, found slot */ + replication_state = curstate; + break; + } + + if (replication_state == NULL && free_slot == -1) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found for %u, increase max_replication_slots", + node))); + /* initialize new slot */ + else if (replication_state == NULL) + { + replication_state = &ReplicationStates[free_slot]; + Assert(replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(replication_state->local_lsn == InvalidXLogRecPtr); + replication_state->local_identifier = node; + } + + Assert(replication_state->local_identifier != InvalidRepNodeId); + + /* + * Due to - harmless - race conditions during a checkpoint we could see + * values here that are older than the ones we already have in + * memory. Don't overwrite those. + */ + if (replication_state->remote_lsn < remote_commit) + replication_state->remote_lsn = remote_commit; + if (replication_state->local_lsn < local_commit) + replication_state->local_lsn = local_commit; +} + +/* + * Tear down a (possibly) cached replication identifier during process exit. + */ +static void +ReplicationIdentifierExitCleanup(int code, Datum arg) +{ + if (local_replication_state != NULL && + local_replication_state->acquired_by == MyProcPid) + { + local_replication_state->acquired_by = 0; + local_replication_state = NULL; + } +} + +/* + * Setup a replication identifier in the shared memory struct if it doesn't + * already exists and cache access to the specific ReplicationSlot so the + * array doesn't have to be searched when calling + * AdvanceCachedReplicationIdentifier(). + * + * Obviously only one such cached identifier can exist per process and the + * current cached value can only be set again after the previous value is torn + * down with TeardownCachedReplicationIdentifier(). + */ +void +SetupCachedReplicationIdentifier(RepNodeId node) +{ + static bool registered_cleanup; + int i; + int free_slot = -1; + + if (!registered_cleanup) + { + on_shmem_exit(ReplicationIdentifierExitCleanup, 0); + registered_cleanup = true; + } + + Assert(max_replication_slots > 0); + + if (local_replication_state != NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot setup replication origin when one is already setup"))); + + LockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); + + /* + * Search for either an existing slot for that identifier or a free one we + * can use. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationState *curstate = &ReplicationStates[i]; + + /* remember where to insert if necessary */ + if (curstate->local_identifier == InvalidRepNodeId && + free_slot == -1) + { + free_slot = i; + continue; + } + + /* not our slot */ + if (curstate->local_identifier != node) + continue; + + else if (curstate->acquired_by != 0) + { + elog(ERROR, "cannot setup slot that is already setup in backend %d", + curstate->acquired_by); + } + + local_replication_state = curstate; + } + + + if (local_replication_state == NULL && free_slot == -1) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("no free replication state could be found for %u, increase max_replication_slots", + node))); + else if (local_replication_state == NULL) + { + local_replication_state = &ReplicationStates[free_slot]; + Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr); + Assert(local_replication_state->local_lsn == InvalidXLogRecPtr); + local_replication_state->local_identifier = node; + } + + Assert(local_replication_state->local_identifier != InvalidRepNodeId); + + local_replication_state->acquired_by = MyProcPid; + + UnlockRelationOid(ReplicationIdentifierRelationId, RowExclusiveLock); +} + +/* + * Make currently cached replication identifier unavailable so a new one can + * be setup with SetupCachedReplicationIdentifier(). + * + * This function may only be called if a previous identifier was setup with + * SetupCachedReplicationIdentifier(). + */ +void +TeardownCachedReplicationIdentifier(void) +{ + Assert(max_replication_slots != 0); + + if (local_replication_state == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("no replication identifier is set up"))); + + local_replication_state->acquired_by = 0; + local_replication_state = NULL; +} + +/* + * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached + * identifier. This is noticeably cheaper if you only ever work on a single + * replication identifier. + */ +void +AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit, + XLogRecPtr local_commit) +{ + Assert(local_replication_state != NULL); + Assert(local_replication_state->local_identifier != InvalidRepNodeId); + + if (local_replication_state->local_lsn < local_commit) + local_replication_state->local_lsn = local_commit; + if (local_replication_state->remote_lsn < remote_commit) + local_replication_state->remote_lsn = remote_commit; +} + +/* + * Ask the machinery about the point up to which we successfully replayed + * changes from a already setup & cached replication identifier. + */ +XLogRecPtr +RemoteCommitFromCachedReplicationIdentifier(void) +{ + Assert(local_replication_state != NULL); + return local_replication_state->remote_lsn; +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 16b9808..e927698 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -31,6 +31,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/replication_identifier.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, CheckpointerShmemSize()); size = add_size(size, AutoVacuumShmemSize()); size = add_size(size, ReplicationSlotsShmemSize()); + size = add_size(size, ReplicationIdentifierShmemSize()); size = add_size(size, WalSndShmemSize()); size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); @@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) CheckpointerShmemInit(); AutoVacuumShmemInit(); ReplicationSlotsShmemInit(); + ReplicationIdentifierShmemInit(); WalSndShmemInit(); WalRcvShmemInit(); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index bd27168..fdccb95 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -54,6 +54,7 @@ #include "catalog/pg_shdepend.h" #include "catalog/pg_shdescription.h" #include "catalog/pg_shseclabel.h" +#include "catalog/pg_replication_identifier.h" #include "catalog/pg_statistic.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_ts_config.h" @@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = { }, 128 }, + {ReplicationIdentifierRelationId, /* REPLIDIDENT */ + ReplicationLocalIdentIndex, + 1, + { + Anum_pg_replication_riident, + 0, + 0, + 0 + }, + 16 + }, + {ReplicationIdentifierRelationId, /* REPLIDREMOTE */ + ReplicationExternalIdentIndex, + 1, + { + Anum_pg_replication_riname, + 0, + 0, + 0 + }, + 16 + }, {RewriteRelationId, /* RULERELNAME */ RewriteRelRulenameIndexId, 2, diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 9572777..fd2d32f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -58,6 +58,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/logical.h" #include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 18614e7..c2a5e15 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -202,6 +202,7 @@ static const char *subdirs[] = { "pg_stat", "pg_stat_tmp", "pg_logical", + "pg_logical/checkpoints", "pg_logical/snapshots", "pg_logical/mappings" }; diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index a16089f..3ae42b8 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -55,6 +55,8 @@ #include "common/fe_memutils.h" #include "storage/large_object.h" #include "pg_getopt.h" +#include "replication/logical.h" +#include "replication/replication_identifier.h" static ControlFileData ControlFile; /* pg_control values */ @@ -1088,6 +1090,7 @@ WriteEmptyXLOG(void) record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_rmid = RM_XLOG_ID; + record->xl_origin_id = InvalidRepNodeId; recptr += SizeOfXLogRecord; *(recptr++) = XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = sizeof(CheckPoint); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 8205504..8bc047b 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -146,10 +146,18 @@ typedef struct xl_xact_commit RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */ /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */ /* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */ + /* xl_xact_origin follows if xinfo specifies it */ } xl_xact_commit; #define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes) +typedef struct xl_xact_origin +{ + XLogRecPtr origin_lsn; + RepNodeId origin_node_id; + TimestampTz origin_timestamp; +} xl_xact_origin; + /* * These flags are set in the xinfo fields of WAL commit records, * indicating a variety of additional actions that need to occur @@ -160,7 +168,7 @@ typedef struct xl_xact_commit */ #define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01 #define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02 - +#define XACT_CONTAINS_ORIGIN 0x04 /* Access macros for above flags */ #define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) #define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 138deaf..f06d11f 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -85,6 +85,7 @@ typedef enum } RecoveryTargetType; extern XLogRecPtr XactLastRecEnd; +extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd; extern bool reachedConsistency; diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 6638c1d..bd8dd70 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -45,6 +45,12 @@ typedef uint64 XLogSegNo; typedef uint32 TimeLineID; /* + * Denotes the node on which the action causing a wal record to be logged + * originated on. + */ +typedef uint16 RepNodeId; + +/* * Because O_DIRECT bypasses the kernel buffers, and because we never * read those buffers except during crash recovery or if wal_level != minimal, * it is a win to use it in all cases where we sync on each write(). We could diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 74bec20..ef05879 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -125,6 +125,10 @@ struct XLogReaderState uint32 main_data_len; /* main data portion's length */ uint32 main_data_bufsz; /* allocated size of the buffer */ +#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING + RepNodeId record_origin; +#endif + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -184,6 +188,11 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) +#ifdef REPLICATION_IDENTIFIER_REUSE_PADDING +#define XLogRecGetOrigin(decoder) ((decoder)->decoded_record->xl_origin_id) +#else +#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#endif #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 25a9265..048e45f 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -45,7 +45,7 @@ typedef struct XLogRecord XLogRecPtr xl_prev; /* ptr to previous record in log */ uint8 xl_info; /* flag bits, see below */ RmgrId xl_rmid; /* resource manager for this record */ - /* 2 bytes of padding here, initialize to zero */ + RepNodeId xl_origin_id; /* what node did originally cause this record to be written */ pg_crc32 xl_crc; /* CRC for this record */ /* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */ @@ -174,5 +174,8 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 +#ifndef REPLICATION_IDENTIFIER_REUSE_PADDING +#define XLR_BLOCK_ID_ORIGIN 253 +#endif #endif /* XLOGRECORD_H */ diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index a680229..405528d 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -305,6 +305,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops)); #define PolicyPolrelidPolnameIndexId 3258 +DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 6001, on pg_replication_identifier using btree(riident oid_ops)); +#define ReplicationLocalIdentIndex 6001 + +DECLARE_UNIQUE_INDEX(pg_replication_identifier_riname_index, 6002, on pg_replication_identifier using btree(riname varchar_pattern_ops)); +#define ReplicationExternalIdentIndex 6002 + /* last step of initialization script: build the indexes declared above */ BUILD_INDICES diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 9edfdb8..3765b38 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5143,6 +5143,34 @@ DESCR("rank of hypothetical row without gaps"); DATA(insert OID = 3993 ( dense_rank_final PGNSP PGUID 12 1 0 2276 0 f f f f f f i 2 0 20 "2281 2276" "{2281,2276}" "{i,v}" _null_ _null_ hypothetical_dense_rank_final _null_ _null_ _null_ )); DESCR("aggregate final function"); +/* replication_identifier.h */ +DATA(insert OID = 6003 ( pg_replication_identifier_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_create _null_ _null_ _null_ )); +DESCR("create local replication identifier for the passed external one"); + +DATA(insert OID = 6004 ( pg_replication_identifier_get PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_get _null_ _null_ _null_ )); +DESCR("translate the external node identifier to a local one"); + +DATA(insert OID = 6005 ( pg_replication_identifier_setup_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_replaying_from _null_ _null_ _null_ )); +DESCR("setup from which node we are replaying transactions from currently"); + +DATA(insert OID = 6006 ( pg_replication_identifier_reset_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ pg_replication_identifier_reset_replaying_from _null_ _null_ _null_ )); +DESCR("reset replay mode"); + +DATA(insert OID = 6007 ( pg_replication_identifier_setup_tx_origin PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_tx_origin _null_ _null_ _null_ )); +DESCR("setup transaction timestamp and origin lsn"); + +DATA(insert OID = 6008 ( pg_get_replication_identifier_progress PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ pg_get_replication_identifier_progress _null_ _null_ _null_ )); +DESCR("replication identifier progress"); + +DATA(insert OID = 6009 ( pg_replication_identifier_is_replaying PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_replication_identifier_is_replaying _null_ _null_ _null_ )); +DESCR("is a replication identifier setup"); + +DATA(insert OID = 6010 ( pg_replication_identifier_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 3 0 2278 "25 3220 3220" _null_ _null_ _null_ _null_ pg_replication_identifier_advance _null_ _null_ _null_ )); +DESCR("advance replication itentifier to specific location"); + +DATA(insert OID = 6011 ( pg_replication_identifier_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_drop _null_ _null_ _null_ )); +DESCR("drop existing replication identifier"); + /* * Symbolic values for provolatile column: these indicate whether the result diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h new file mode 100644 index 0000000..26eec17 --- /dev/null +++ b/src/include/catalog/pg_replication_identifier.h @@ -0,0 +1,75 @@ +/*------------------------------------------------------------------------- + * + * pg_replication_identifier.h + * Persistent Replication Node Identifiers + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_replication_identifier.h + * + * NOTES + * the genbki.pl script reads this file and generates .bki + * information from the DATA() statements. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_REPLICATION_IDENTIFIER_H +#define PG_REPLICATION_IDENTIFIER_H + +#include "catalog/genbki.h" +#include "access/xlogdefs.h" + +/* ---------------- + * pg_replication_identifier. cpp turns this into + * typedef struct FormData_pg_replication_identifier + * ---------------- + */ +#define ReplicationIdentifierRelationId 6000 + +CATALOG(pg_replication_identifier,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS +{ + /* + * locally known identifier that gets included into wal. + * + * This should never leave the system. + * + * Needs to fit into a uint16, so we don't waste too much space in WAL + * records. For this reason we don't use a normal Oid column here, since + * we need to handle allocation of new values manually. + */ + Oid riident; + + /* + * Variable-length fields start here, but we allow direct access to + * riname. + */ + + /* external, free-format, identifier */ + text riname; +#ifdef CATALOG_VARLEN /* further variable-length fields */ +#endif +} FormData_pg_replication_identifier; + +/* ---------------- + * Form_pg_extension corresponds to a pointer to a tuple with + * the format of pg_extension relation. + * ---------------- + */ +typedef FormData_pg_replication_identifier *Form_pg_replication_identifier; + +/* ---------------- + * compiler constants for pg_replication_identifier + * ---------------- + */ + +#define Natts_pg_replication_identifier 2 +#define Anum_pg_replication_riident 1 +#define Anum_pg_replication_riname 2 + +/* ---------------- + * pg_replication_identifier has no initial contents + * ---------------- + */ + +#endif /* PG_REPLICTION_IDENTIFIER_H */ diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h index 5cfc0ae..c787523 100644 --- a/src/include/pg_config_manual.h +++ b/src/include/pg_config_manual.h @@ -265,6 +265,12 @@ #endif /* + * Temporary switch to change between using xlog padding or a separate block + * id in the record to record the xlog origin of a record. + */ +/* #define REPLICATION_IDENTIFIER_REUSE_PADDING */ + +/* * Define this to cause palloc()'d memory to be filled with random data, to * facilitate catching code that depends on the contents of uninitialized * memory. Caution: this is horrendously expensive. diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 0935c1b..26095b1 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -74,6 +74,13 @@ typedef void (*LogicalDecodeCommitCB) ( XLogRecPtr commit_lsn); /* + * Filter changes by origin. + */ +typedef bool (*LogicalDecodeFilterByOriginCB) ( + struct LogicalDecodingContext *, + RepNodeId origin_id); + +/* * Called to shutdown an output plugin. */ typedef void (*LogicalDecodeShutdownCB) ( @@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeCommitCB commit_cb; + LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 5a1d9a0..784abd6 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,6 +64,8 @@ typedef struct ReorderBufferChange /* The type of change. */ enum ReorderBufferChangeType action; + RepNodeId origin_id; + /* * Context data for the change, which part of the union is valid depends * on action/action_internal. @@ -162,6 +164,10 @@ typedef struct ReorderBufferTXN */ XLogRecPtr restart_decoding_lsn; + /* origin of the change that caused this transaction */ + RepNodeId origin_id; + XLogRecPtr origin_lsn; + /* * Commit time, only known when we read the actual commit record. */ @@ -335,7 +341,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *); void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, - TimestampTz commit_time); + TimestampTz commit_time, RepNodeId origin_id, XLogRecPtr origin_lsn); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h new file mode 100644 index 0000000..36d74aa --- /dev/null +++ b/src/include/replication/replication_identifier.h @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * replication_identifier.h + * XXX + * + * Copyright (c) 2013-2015, PostgreSQL Global Development Group + * + *------------------------------------------------------------------------- + */ +#ifndef REPLICATION_IDENTIFIER_H +#define REPLICATION_IDENTIFIER_H + +#include "catalog/pg_replication_identifier.h" +#include "replication/logical.h" + +#define InvalidRepNodeId 0 +#define DoNotReplicateRepNodeId USHRT_MAX + +extern PGDLLIMPORT RepNodeId replication_origin_id; +extern PGDLLIMPORT XLogRecPtr replication_origin_lsn; +extern PGDLLIMPORT TimestampTz replication_origin_timestamp; + +/* API for querying & manipulating replication identifiers */ +extern RepNodeId GetReplicationIdentifier(char *name, bool missing_ok); +extern RepNodeId CreateReplicationIdentifier(char *name); +extern void GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok, + char **riname); +extern void DropReplicationIdentifier(RepNodeId riident); + +extern void AdvanceReplicationIdentifier(RepNodeId node, + XLogRecPtr remote_commit, + XLogRecPtr local_commit); +extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit, + XLogRecPtr local_commit); +extern void SetupCachedReplicationIdentifier(RepNodeId node); +extern void TeardownCachedReplicationIdentifier(void); +extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void); + +/* crash recovery support */ +extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt); +extern void TruncateReplicationIdentifier(XLogRecPtr cutoff); +extern void StartupReplicationIdentifier(XLogRecPtr ckpt); + +/* internals */ +extern Size ReplicationIdentifierShmemSize(void); +extern void ReplicationIdentifierShmemInit(void); + +/* SQL callable functions */ +extern Datum pg_replication_identifier_get(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_create(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_drop(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_is_replaying(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS); +extern Datum pg_get_replication_identifier_progress(PG_FUNCTION_ARGS); +extern Datum pg_replication_identifier_advance(PG_FUNCTION_ARGS); + +#endif diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index ba0b090..d7be45a 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -77,6 +77,8 @@ enum SysCacheIdentifier RANGETYPE, RELNAMENSP, RELOID, + REPLIDIDENT, + REPLIDREMOTE, RULERELNAME, STATRELATTINH, TABLESPACEOID, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index d50b103..5609503 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1390,6 +1390,11 @@ pg_prepared_xacts| SELECT p.transaction, FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid) LEFT JOIN pg_authid u ON ((p.ownerid = u.oid))) LEFT JOIN pg_database d ON ((p.dbid = d.oid))); +pg_replication_identifier_progress| SELECT pg_get_replication_identifier_progress.local_id, + pg_get_replication_identifier_progress.external_id, + pg_get_replication_identifier_progress.remote_lsn, + pg_get_replication_identifier_progress.local_lsn + FROM pg_get_replication_identifier_progress() pg_get_replication_identifier_progress(local_id, external_id, remote_lsn, local_lsn); pg_replication_slots| SELECT l.slot_name, l.plugin, l.slot_type, diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index c7be273..400cba3 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -121,6 +121,7 @@ pg_pltemplate|t pg_policy|t pg_proc|t pg_range|t +pg_replication_identifier|t pg_rewrite|t pg_seclabel|t pg_shdepend|t -- 2.0.0.rc2.4.g1dc51c6.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers