On Mon, Aug 18, 2014 at 4:01 PM, Michael Paquier
<[email protected]> wrote:
> On Mon, Aug 18, 2014 at 3:48 PM, Fujii Masao <[email protected]> wrote:
>> On Mon, Aug 18, 2014 at 2:38 PM, Michael Paquier
>> <[email protected]> wrote:
> And now looking at your patch there is additional refactoring possible
> with IDENTIFY_SYSTEM and pg_basebackup as well...
And attached is a rebased patch doing so.
Regards,
--
Michael
From a75def1dc554023d94611887410926d779596749 Mon Sep 17 00:00:00 2001
From: Michael Paquier <[email protected]>
Date: Mon, 18 Aug 2014 14:32:44 +0900
Subject: [PATCH] Add support for physical slot creation/deletion in
pg_receivexlog
Physical slot creation can be done with --create and drop with --drop.
In both cases --slot is needed. Code for replication slot creation and
drop is refactored with what was available in pg_recvlogical, the same
applies with IDENTIFY_SYSTEM to simplify the whole set, including
pg_basebackup in bonus.
---
doc/src/sgml/ref/pg_receivexlog.sgml | 29 +++++++
src/bin/pg_basebackup/pg_basebackup.c | 21 +----
src/bin/pg_basebackup/pg_receivexlog.c | 108 +++++++++++++++---------
src/bin/pg_basebackup/pg_recvlogical.c | 119 ++++----------------------
src/bin/pg_basebackup/streamutil.c | 148 +++++++++++++++++++++++++++++++++
src/bin/pg_basebackup/streamutil.h | 9 ++
6 files changed, 276 insertions(+), 158 deletions(-)
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 5916b8f..7e46005 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -72,6 +72,35 @@ PostgreSQL documentation
<title>Options</title>
<para>
+ <application>pg_receivexlog</application> can run in one of two following
+ modes, which control physical replication slot:
+
+ <variablelist>
+
+ <varlistentry>
+ <term><option>--create</option></term>
+ <listitem>
+ <para>
+ Create a new physical replication slot with the name specified in
+ <option>--slot</option>, then exit.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><option>--drop</option></term>
+ <listitem>
+ <para>
+ Drop the replication slot with the name specified
+ in <option>--slot</option>, then exit.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ </para>
+
+ <para>
The following command-line options control the location and format of the
output.
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 3d26e22..66c851b 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1582,8 +1582,8 @@ BaseBackup(void)
{
PGresult *res;
char *sysidentifier;
- uint32 latesttli;
- uint32 starttli;
+ TimeLineID latesttli;
+ TimeLineID starttli;
char *basebkp;
char escaped_label[MAXPGPATH];
char *maxrate_clause = NULL;
@@ -1637,23 +1637,8 @@ BaseBackup(void)
/*
* Run IDENTIFY_SYSTEM so we can get the timeline
*/
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ if (!RunIdentifySystem(conn, &sysidentifier, &latesttli, NULL))
disconnect_and_exit(1);
- }
- if (PQntuples(res) != 1 || PQnfields(res) < 3)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 3);
- disconnect_and_exit(1);
- }
- sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
- latesttli = atoi(PQgetvalue(res, 0, 1));
- PQclear(res);
/*
* Start the actual backup
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index a8b9ad3..e87839a 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -38,6 +38,8 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
+static bool do_create_slot = false;
+static bool do_drop_slot = false;
static void usage(void);
@@ -78,6 +80,9 @@ usage(void)
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
+ printf(_("\nOptional actions:\n"));
+ printf(_(" --create create a new replication slot (for the slot's name see --slot)\n"));
+ printf(_(" --drop drop the replication slot (for the slot's name see --slot)\n"));
printf(_("\nReport bugs to <[email protected]>.\n"));
}
@@ -253,21 +258,10 @@ FindStreamingStart(uint32 *tli)
static void
StreamLog(void)
{
- PGresult *res;
XLogRecPtr startpos;
- uint32 starttli;
+ TimeLineID starttli;
XLogRecPtr serverpos;
- uint32 servertli;
- uint32 hi,
- lo;
-
- /*
- * Connect in replication mode to the server
- */
- conn = GetConnection();
- if (!conn)
- /* Error message already written in GetConnection() */
- return;
+ TimeLineID servertli;
if (!CheckServerVersionForStreaming(conn))
{
@@ -280,33 +274,11 @@ StreamLog(void)
}
/*
- * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
- * position.
+ * Identify server, obtaining start LSN position and current timeline ID
+ * at the same time.
*/
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
- disconnect_and_exit(1);
- }
- if (PQntuples(res) != 1 || PQnfields(res) < 3)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 3);
+ if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos))
disconnect_and_exit(1);
- }
- servertli = atoi(PQgetvalue(res, 0, 1));
- if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
- {
- fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
- progname, PQgetvalue(res, 0, 2));
- disconnect_and_exit(1);
- }
- serverpos = ((uint64) hi) << 32 | lo;
- PQclear(res);
/*
* Figure out where to start streaming.
@@ -370,6 +342,9 @@ main(int argc, char **argv)
{"status-interval", required_argument, NULL, 's'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
+/* action */
+ {"create", no_argument, NULL, 1},
+ {"drop", no_argument, NULL, 2},
{NULL, 0, NULL, 0}
};
@@ -453,6 +428,13 @@ main(int argc, char **argv)
case 'v':
verbose++;
break;
+/* action */
+ case 1:
+ do_create_slot = true;
+ break;
+ case 2:
+ do_drop_slot = true;
+ break;
default:
/*
@@ -477,10 +459,26 @@ main(int argc, char **argv)
exit(1);
}
+ if (replication_slot == NULL && (do_drop_slot || do_create_slot))
+ {
+ fprintf(stderr, _("%s: replication slot needed with action --create or --drop\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ if (do_drop_slot && do_create_slot)
+ {
+ fprintf(stderr, _("%s: cannot use --create together with --drop\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
/*
* Required arguments
*/
- if (basedir == NULL)
+ if (basedir == NULL && !do_create_slot && !do_drop_slot)
{
fprintf(stderr, _("%s: no target directory specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
@@ -492,6 +490,38 @@ main(int argc, char **argv)
pqsignal(SIGINT, sigint_handler);
#endif
+ /* Obtain a connection before doing anything */
+ conn = GetConnection();
+ if (!conn)
+ /* Error message already written in GetConnection() */
+ exit(1);
+
+ /* Drop a replication slot */
+ if (do_drop_slot)
+ {
+ if (verbose)
+ fprintf(stderr,
+ _("%s: dropping replication slot \"%s\"\n"),
+ progname, replication_slot);
+
+ if (!DropReplicationSlot(conn, true))
+ disconnect_and_exit(1);
+ disconnect_and_exit(0);
+ }
+
+ /* Create a replication slot */
+ if (do_create_slot)
+ {
+ if (verbose)
+ fprintf(stderr,
+ _("%s: creating replication slot \"%s\"\n"),
+ progname, replication_slot);
+
+ if (!CreateReplicationSlot(conn, NULL, true))
+ disconnect_and_exit(1);
+ disconnect_and_exit(0);
+ }
+
while (true)
{
StreamLog();
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3b0f34..1e53c82 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -208,15 +208,6 @@ StreamLog(void)
query = createPQExpBuffer();
/*
- * Connect in replication mode to the server
- */
- if (!conn)
- conn = GetConnection();
- if (!conn)
- /* Error message already written in GetConnection() */
- return;
-
- /*
* Start the replication
*/
if (verbose)
@@ -596,7 +587,6 @@ sighup_handler(int signum)
int
main(int argc, char **argv)
{
- PGresult *res;
static struct option long_options[] = {
/* general options */
{"file", required_argument, NULL, 'f'},
@@ -834,121 +824,48 @@ main(int argc, char **argv)
#endif
/*
- * don't really need this but it actually helps to get more precise error
- * messages about authentication, required GUCs and such without starting
- * to loop around connection attempts lateron.
+ * Obtain a connection to server. This is not really necessary but it
+ * helps to get more precise error messages about authentification,
+ * required GUC parameters and such.
*/
- {
- conn = GetConnection();
- if (!conn)
- /* Error message already written in GetConnection() */
- exit(1);
-
- /*
- * Run IDENTIFY_SYSTEM so we can get the timeline and current xlog
- * position.
- */
- res = PQexec(conn, "IDENTIFY_SYSTEM");
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
- disconnect_and_exit(1);
- }
-
- if (PQntuples(res) != 1 || PQnfields(res) < 4)
- {
- fprintf(stderr,
- _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 4);
- disconnect_and_exit(1);
- }
- PQclear(res);
- }
-
+ conn = GetConnection();
+ if (!conn)
+ /* Error message already written in GetConnection() */
+ exit(1);
- /*
- * stop a replication slot
- */
+ /* Drop a replication slot */
if (do_drop_slot)
{
- char query[256];
-
if (verbose)
fprintf(stderr,
- _("%s: freeing replication slot \"%s\"\n"),
+ _("%s: dropping replication slot \"%s\"\n"),
progname, replication_slot);
- snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
- replication_slot);
- res = PQexec(conn, query);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query, PQerrorMessage(conn));
- disconnect_and_exit(1);
- }
-
- if (PQntuples(res) != 0 || PQnfields(res) != 0)
- {
- fprintf(stderr,
- _("%s: could not stop logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, PQntuples(res), PQnfields(res), 0, 0);
+ if (!DropReplicationSlot(conn, false))
disconnect_and_exit(1);
- }
-
- PQclear(res);
disconnect_and_exit(0);
}
- /*
- * init a replication slot
- */
+ /* Create a replication slot */
if (do_create_slot)
{
- char query[256];
-
if (verbose)
fprintf(stderr,
- _("%s: initializing replication slot \"%s\"\n"),
+ _("%s: creating replication slot \"%s\"\n"),
progname, replication_slot);
- snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
- replication_slot, plugin);
-
- res = PQexec(conn, query);
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
- progname, query, PQerrorMessage(conn));
+ if (!CreateReplicationSlot(conn, plugin, false))
disconnect_and_exit(1);
- }
-
- if (PQntuples(res) != 1 || PQnfields(res) != 4)
- {
- fprintf(stderr,
- _("%s: could not init logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
- progname, PQntuples(res), PQnfields(res), 1, 4);
- disconnect_and_exit(1);
- }
-
- if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
- {
- fprintf(stderr,
- _("%s: could not parse transaction log location \"%s\"\n"),
- progname, PQgetvalue(res, 0, 1));
- disconnect_and_exit(1);
- }
- startpos = ((uint64) hi) << 32 | lo;
-
- replication_slot = strdup(PQgetvalue(res, 0, 0));
- PQclear(res);
}
-
if (!do_start_slot)
disconnect_and_exit(0);
+ /* Identify system, obtaining start LSN position at the same time */
+ if (!RunIdentifySystem(conn, NULL, NULL, &startpos))
+ disconnect_and_exit(1);
+
+ /* Stream loop */
while (true)
{
StreamLog();
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1100260..7549c8a 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -227,6 +227,154 @@ GetConnection(void)
return tmpconn;
}
+/*
+ * Run IDENTIFY_SYSTEM through a given connection and give back to caller
+ * some result information if requested:
+ * - Start LSN position
+ * - Current timeline ID
+ * - system identifier
+ */
+bool
+RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
+ XLogRecPtr *startpos)
+{
+ PGresult *res;
+ uint32 hi, lo;
+
+ /* Leave if no connection present */
+ if (conn == NULL)
+ return false;
+
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
+ return false;
+ }
+ if (PQntuples(res) != 1 || PQnfields(res) < 4)
+ {
+ fprintf(stderr,
+ _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
+ progname, PQntuples(res), PQnfields(res), 1, 4);
+ return false;
+ }
+
+ /* Get system identifier */
+ if (sysid != NULL)
+ *sysid = pg_strdup(PQgetvalue(res, 0, 0));
+
+ /* Get timeline ID to start streaming from */
+ if (starttli != NULL)
+ *starttli = atoi(PQgetvalue(res, 0, 1));
+
+ /* Get LSN start position if necessary */
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ return false;
+ }
+ if (startpos != NULL)
+ *startpos = ((uint64) hi) << 32 | lo;
+
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Create a replication slot for the given connection. This function
+ * returns true in case of success as well as the start position
+ * obtained after the slot creation.
+ */
+bool
+CreateReplicationSlot(PGconn *conn, const char *plugin,
+ bool is_physical)
+{
+ char query[256];
+ PGresult *res;
+ uint32 hi, lo;
+
+ Assert((is_physical && plugin == NULL) ||
+ (!is_physical && plugin != NULL));
+ Assert(replication_slot != NULL);
+
+ /* Build query */
+ if (is_physical)
+ snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
+ replication_slot);
+ else
+ snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+ replication_slot, plugin);
+
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 1 || PQnfields(res) != 4)
+ {
+ fprintf(stderr,
+ _("%s: could not init %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, is_physical ? "physical" : "logical",
+ PQntuples(res), PQnfields(res), 1, 4);
+ return false;
+ }
+
+ /* Check LSN format obtained as consistent point */
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ fprintf(stderr,
+ _("%s: could not parse transaction log location \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 1));
+ return false;
+ }
+
+ replication_slot = strdup(PQgetvalue(res, 0, 0));
+ PQclear(res);
+ return true;
+}
+
+/*
+ * Drop a replication slot for the given connection. This function
+ * returns true in case of success.
+ */
+bool
+DropReplicationSlot(PGconn *conn, bool is_physical)
+{
+ char query[256];
+ PGresult *res;
+
+ Assert(replication_slot != NULL);
+
+ /* Build query */
+ snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"",
+ replication_slot);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
+ progname, query, PQerrorMessage(conn));
+ return false;
+ }
+
+ if (PQntuples(res) != 0 || PQnfields(res) != 0)
+ {
+ fprintf(stderr,
+ _("%s: could not stop %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"),
+ progname, is_physical ? "physical" : "logical",
+ PQntuples(res), PQnfields(res), 0, 0);
+ return false;
+ }
+
+ PQclear(res);
+ return true;
+}
+
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index c36a37b..14bfffc 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -1,5 +1,7 @@
#include "libpq-fe.h"
+#include "access/xlogdefs.h"
+
extern const char *progname;
extern char *connection_string;
extern char *dbhost;
@@ -14,6 +16,13 @@ extern PGconn *conn;
extern PGconn *GetConnection(void);
+/* Replication commands */
+extern bool CreateReplicationSlot(PGconn *conn, const char *plugin,
+ bool is_physical);
+extern bool DropReplicationSlot(PGconn *conn, bool is_physical);
+extern bool RunIdentifySystem(PGconn *conn, char **sysid,
+ TimeLineID *starttli,
+ XLogRecPtr *startpos);
extern int64 feGetCurrentTimestamp(void);
extern void feTimestampDifference(int64 start_time, int64 stop_time,
long *secs, int *microsecs);
--
2.1.0
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers