On Fri, Jul 11, 2014 at 6:23 PM, Andres Freund <and...@2ndquadrant.com> wrote: > Ok. Do you plan to take care of it? If, I'd be fine with backpatching > it. I'm not likely to get to it right now :(
Actually I came up with the same need as Magnus, but a bit later, so... Attached is a patch to support physical slot creation and drop in pg_receivexlog with the addition of new options --create and --drop. It would be nice to have that in 9.4, but I will not the one deciding that at the end :) Code has been refactored with what is already available in pg_recvlogical for the slot creation and drop. Regards, -- Michael
From 62ef9bd097453648e4f313f1aedf91ba2b4a3f1e Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@otacoo.com> Date: Tue, 12 Aug 2014 21:54:13 +0900 Subject: [PATCH] Add support for physical slot creation/deletion in pg_receivexlog Physical slot creation can be done with --create and drop with --drop. In both cases --slot is needed. Code for replication slot creation and drop is refactored with what was available in pg_recvlogical. --- doc/src/sgml/ref/pg_receivexlog.sgml | 29 ++++++++++ src/bin/pg_basebackup/pg_receivexlog.c | 73 +++++++++++++++++++++---- src/bin/pg_basebackup/pg_recvlogical.c | 66 +++-------------------- src/bin/pg_basebackup/streamutil.c | 97 ++++++++++++++++++++++++++++++++++ src/bin/pg_basebackup/streamutil.h | 7 +++ 5 files changed, 203 insertions(+), 69 deletions(-) diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml index c15776f..9251f85 100644 --- a/doc/src/sgml/ref/pg_receivexlog.sgml +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -72,6 +72,35 @@ PostgreSQL documentation <title>Options</title> <para> + <application>pg_receivexlog</application> can run in one of two following + modes, which control physical replication slot: + + <variablelist> + + <varlistentry> + <term><option>--create</option></term> + <listitem> + <para> + Create a new physical replication slot with the name specified in + <option>--slot</option>, then exit. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><option>--drop</option></term> + <listitem> + <para> + Drop the replication slot with the name specified + in <option>--slot</option>, then exit. + </para> + </listitem> + </varlistentry> + </variablelist> + + </para> + + <para> The following command-line options control the location and format of the output. diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index 0b7af54..67c56cb 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -38,6 +38,8 @@ static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; +static bool do_create_slot = false; +static bool do_drop_slot = false; static void usage(void); @@ -78,6 +80,9 @@ usage(void) printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); printf(_(" --slot=SLOTNAME replication slot to use\n")); + printf(_("\nOptional actions:\n")); + printf(_(" --create create a new replication slot (for the slot's name see --slot)\n")); + printf(_(" --drop drop the replication slot (for the slot's name see --slot)\n")); printf(_("\nReport bugs to <pgsql-b...@postgresql.org>.\n")); } @@ -261,14 +266,6 @@ StreamLog(void) uint32 hi, lo; - /* - * Connect in replication mode to the server - */ - conn = GetConnection(); - if (!conn) - /* Error message already written in GetConnection() */ - return; - if (!CheckServerVersionForStreaming(conn)) { /* @@ -370,6 +367,9 @@ main(int argc, char **argv) {"status-interval", required_argument, NULL, 's'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, +/* action */ + {"create", no_argument, NULL, 1}, + {"drop", no_argument, NULL, 2}, {NULL, 0, NULL, 0} }; @@ -453,6 +453,13 @@ main(int argc, char **argv) case 'v': verbose++; break; +/* action */ + case 1: + do_create_slot = true; + break; + case 2: + do_drop_slot = true; + break; default: /* @@ -477,10 +484,26 @@ main(int argc, char **argv) exit(1); } + if (replication_slot == NULL && (do_drop_slot || do_create_slot)) + { + fprintf(stderr, _("%s: replication slot needed with action --create or --drop\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (do_drop_slot && do_create_slot) + { + fprintf(stderr, _("%s: cannot use --create together with --drop\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + /* * Required arguments */ - if (basedir == NULL) + if (basedir == NULL && !do_create_slot && !do_drop_slot) { fprintf(stderr, _("%s: no target directory specified\n"), progname); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), @@ -492,6 +515,38 @@ main(int argc, char **argv) pqsignal(SIGINT, sigint_handler); #endif + /* Obtain a connection before doing anything */ + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + exit(1); + + /* Drop a replication slot */ + if (do_drop_slot) + { + if (verbose) + fprintf(stderr, + _("%s: dropping replication slot \"%s\"\n"), + progname, replication_slot); + + if (!DropReplicationSlot(conn, true)) + disconnect_and_exit(1); + disconnect_and_exit(0); + } + + /* Create a replication slot */ + if (do_create_slot) + { + if (verbose) + fprintf(stderr, + _("%s: creating replication slot \"%s\"\n"), + progname, replication_slot); + + if (!CreateReplicationSlot(conn, NULL, true, NULL)) + disconnect_and_exit(1); + disconnect_and_exit(0); + } + while (true) { StreamLog(); diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index f3b0f34..475c69c 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -867,85 +867,31 @@ main(int argc, char **argv) } - /* - * stop a replication slot - */ + /* Drop a replication slot */ if (do_drop_slot) { - char query[256]; - if (verbose) fprintf(stderr, - _("%s: freeing replication slot \"%s\"\n"), + _("%s: dropping replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", - replication_slot); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); - disconnect_and_exit(1); - } - - if (PQntuples(res) != 0 || PQnfields(res) != 0) - { - fprintf(stderr, - _("%s: could not stop logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 0, 0); + if (!DropReplicationSlot(conn, false)) disconnect_and_exit(1); - } - - PQclear(res); disconnect_and_exit(0); } - /* - * init a replication slot - */ + /* Create a replication slot */ if (do_create_slot) { - char query[256]; - if (verbose) fprintf(stderr, - _("%s: initializing replication slot \"%s\"\n"), + _("%s: creating replication slot \"%s\"\n"), progname, replication_slot); - snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", - replication_slot, plugin); - - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, query, PQerrorMessage(conn)); + if (!CreateReplicationSlot(conn, plugin, false, &startpos)) disconnect_and_exit(1); - } - - if (PQntuples(res) != 1 || PQnfields(res) != 4) - { - fprintf(stderr, - _("%s: could not init logical replication: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 4); - disconnect_and_exit(1); - } - - if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) - { - fprintf(stderr, - _("%s: could not parse transaction log location \"%s\"\n"), - progname, PQgetvalue(res, 0, 1)); - disconnect_and_exit(1); - } - startpos = ((uint64) hi) << 32 | lo; - - replication_slot = strdup(PQgetvalue(res, 0, 0)); - PQclear(res); } - if (!do_start_slot) disconnect_and_exit(0); diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index 1100260..27cee59 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -229,6 +229,103 @@ GetConnection(void) /* + * Create a replication slot for the given connection. This function + * returns true in case of success as well as the start position + * obtained after the slot creation. + */ +bool +CreateReplicationSlot(PGconn *conn, const char *plugin, + bool is_physical, XLogRecPtr *startpos) +{ + char query[256]; + PGresult *res; + uint32 hi, lo; + + Assert(is_physical && plugin == NULL || + !is_physical && plugin != NULL); + Assert(replication_slot != NULL); + + /* Build query */ + if (is_physical) + snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", + replication_slot); + else + snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + replication_slot, plugin); + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 1 || PQnfields(res) != 4) + { + fprintf(stderr, + _("%s: could not init %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, is_physical ? "physical" : "logical", + PQntuples(res), PQnfields(res), 1, 4); + return false; + } + + /* Check LSN format obtained as consistent point */ + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse transaction log location \"%s\"\n"), + progname, PQgetvalue(res, 0, 1)); + return false; + } + + /* Save start position for caller */ + if (startpos != NULL) + *startpos = ((uint64) hi) << 32 | lo; + + replication_slot = strdup(PQgetvalue(res, 0, 0)); + PQclear(res); + return true; +} + +/* + * Drop a replication slot for the given connection. This function + * returns true in case of success. + */ +bool +DropReplicationSlot(PGconn *conn, bool is_physical) +{ + char query[256]; + PGresult *res; + + Assert(replication_slot != NULL); + + /* Build query */ + snprintf(query, sizeof(query), "DROP_REPLICATION_SLOT \"%s\"", + replication_slot); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), + progname, query, PQerrorMessage(conn)); + return false; + } + + if (PQntuples(res) != 0 || PQnfields(res) != 0) + { + fprintf(stderr, + _("%s: could not stop %s replication: got %d rows and %d fields, expected %d rows and %d fields\n"), + progname, is_physical ? "physical" : "logical", + PQntuples(res), PQnfields(res), 0, 0); + return false; + } + + PQclear(res); + return true; +} + + +/* * Frontend version of GetCurrentTimestamp(), since we are not linked with * backend code. The protocol always uses integer timestamps, regardless of * server setting. diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index c36a37b..aeae892 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -1,5 +1,7 @@ #include "libpq-fe.h" +#include "access/xlogdefs.h" + extern const char *progname; extern char *connection_string; extern char *dbhost; @@ -14,6 +16,11 @@ extern PGconn *conn; extern PGconn *GetConnection(void); +/* Replication slot management */ +extern bool CreateReplicationSlot(PGconn *conn, const char *plugin, + bool is_physical, XLogRecPtr *startpos); +extern bool DropReplicationSlot(PGconn *conn, bool is_physical); + extern int64 feGetCurrentTimestamp(void); extern void feTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs); -- 2.0.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers