Dear Euler,

Thanks for updating the patch!

>v24-0003: as I said I don't think we need to add it, however, I won't fight
>against it if people want to add this check.

OK, let's wait comments from senior members.

>Since I applied v24-0004, I realized that extra start / stop service are
>required. It mean pg_createsubscriber doesn't start the transformation with the
>current standby settings. Instead, it stops the standby if it is running and
>start it with the provided command-line options (socket, port,
>listen_addresses). It has a few drawbacks:
>* See v34-0012. It cannot detect if the target server is a primary for another
>  server. It is documented.

Yeah, It is a collateral damage.

>* I also removed the check for standby is running. If the standby was stopped a
>  long time ago, it will take some time to reach the start point.
>* Dry run mode has to start / stop the service to work correctly. Is it an
>  issue?

One concern (see below comment) is that -l option would not be passed even if
the standby has been logging before running pg_createsubscriber. Also, some 
settings
passed by pg_ctl start -o .... would not be restored.

>However, I decided to include --retain option, I'm thinking about to remove it.
>If the logging is enabled, the information during the pg_createsubscriber will
>be available. The client log can be redirected to a file for future inspection.

Just to confirm - you meant to say like below, right? 
* the client output would be redirected, and
* -r option would be removed.

Here are my initial comments for v25-0001. I read new doc and looks very good.
I may do reviewing more about v25-0001, but feel free to revise.

01. cleanup_objects_atexit
```
        PGconn     *conn;
        int                     i;
```
The declaration *conn can be in the for-loop. Also, the declaration of the 
indicator can be in the bracket.

02. cleanup_objects_atexit
```

                                /*
                                 * If a connection could not be established, 
inform the user
                                 * that some objects were left on primary and 
should be
                                 * removed before trying again.
                                 */
                                if (dbinfo[i].made_publication)
                                {
                                        pg_log_warning("There might be a 
publication \"%s\" in database \"%s\" on primary",
                                                                   
dbinfo[i].pubname, dbinfo[i].dbname);
                                        pg_log_warning_hint("Consider dropping 
this publication before trying again.");
                                }
                                if (dbinfo[i].made_replslot)
                                {
                                        pg_log_warning("There might be a 
replication slot \"%s\" in database \"%s\" on primary",
                                                                   
dbinfo[i].subname, dbinfo[i].dbname);
                                        pg_log_warning_hint("Drop this 
replication slot soon to avoid retention of WAL files.");
                                }
```

Not sure which is better, but we may able to the list to the concrete file like 
pg_upgrade.
(I thought it had been already discussed, but could not find from the archive. 
Sorry if it was a duplicated comment)

03. main
```
        while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v",
                                                        long_options, 
&option_index)) != -1)
```

Missing update for __shortopts.

04. main
```
                        case 'D':
                                opt.subscriber_dir = pg_strdup(optarg);
                                canonicalize_path(opt.subscriber_dir);
                                break;
...
                        case 'P':
                                opt.pub_conninfo_str = pg_strdup(optarg);
                                break;
...
                        case 's':
                                opt.socket_dir = pg_strdup(optarg);
                                break;
...
                        case 'U':
                                opt.sub_username = pg_strdup(optarg);
                                break;
```

Should we consider the case these options would be specified twice?
I.e., should we call pg_free() before the substitution?
 
05. main

Missing canonicalize_path() to the socket_dir.

06. main
```
        /*
         * If socket directory is not provided, use the current directory.
         */
```

One-line comment can be used. Period can be also removed at that time.

07. main
```
        /*
         *
         * If subscriber username is not provided, check if the environment
         * variable sets it. If not, obtain the operating system name of the 
user
         * running it.
         */
```
Unnecessary blank.

08. main
```
                char       *errstr = NULL;
```
 
This declaration can be at else-part.

09. main.

Also, as the first place, do we have to get username if not specified?
I felt libpq can handle the case if we skip passing the info.

10. main
```
        appendPQExpBuffer(sub_conninfo_str, "host=%s port=%u user=%s 
fallback_application_name=%s",
                                          opt.socket_dir, opt.sub_port, 
opt.sub_username, progname);
        sub_base_conninfo = get_base_conninfo(sub_conninfo_str->data, NULL);
```

Is it really needed to call get_base_conninfo? I think no need to define
sub_base_conninfo.

11. main

```
        /*
         * In dry run mode, the server is restarted with the provided 
command-line
         * options so validation can be applied in the target server. In order 
to
         * preserve the initial state of the server (running), start it without
         * the command-line options.
         */
        if (dry_run)
                start_standby_server(&opt, pg_ctl_path, NULL, false);
```

I think initial state of the server may be stopped. Now both conditions are 
allowed.
And I think it is not good not to specify the logfile.

12. others

As Peter E pointed out [1], the main function is still huge. It has more than 
400 lines.
I think all functions should have less than 100 line to keep the readability.

I considered separation idea like below. Note that this may require to change
orderings. How do you think?

* add parse_command_options() which accepts user options and verifies them
* add verification_phase() or something which checks system identifier and 
calls check_XXX
* add catchup_phase() or something which creates a temporary slot, writes 
recovery parameters,
  and wait until the end of recovery
* add cleanup_phase() or something which removes primary_slot and modifies the
  system identifier
* stop/start server can be combined into one wrapper.

Attached txt file is proofs the concept.

13. others

PQresultStatus(res) is called 17 times in this source code, it may be redundant.
I think we can introduce a function like executeQueryOrDie() and gather in one 
place.

14. others

I found that pg_createsubscriber does not refer functions declared in other 
files.
Is there a possibility to use them, e.g., streamutils.h?

15. others 

While reading the old discussions [2], Amit suggested to keep the comment and 
avoid
creating a temporary slot. You said "Got it" but temp slot still exists.
Is there any reason? Can you clarify your opinion?

16. others

While reading [2] and [3], I was confused the decision. You and Amit discussed
the combination with pg_createsubscriber and slot sync and how should handle
slots on the physical standby. You seemed to agree to remove such a slot, and
Amit also suggested to raise an ERROR. However, you said in [8] that such
handlings is not mandatory so should raise an WARNING in dry_run. I was quite 
confused.
Am I missing something?

17. others

Per discussion around [4], we might have to consider an if the some options like
data_directory and config_file was initially specified for standby server. 
Another
easy approach is to allow users to specify options like -o in pg_upgrade [5],
which is similar to your idea. Thought?


18. others

How do you handle the reported failure [6]?

19. main
```
        char       *pub_base_conninfo = NULL;
        char       *sub_base_conninfo = NULL;
        char       *dbname_conninfo = NULL;
```

No need to initialize pub_base_conninfo and sub_base_conninfo.
These variables would not be free'd.

20. others

IIUC, slot creations would not be finished if there are prepared transactions.
Should we detect it on the verification phase and raise an ERROR?

21. others

As I said in [7], the catch up would not be finished if long 
recovery_min_apply_delay
is used. Should we overwrite during the catch up?

22. pg_createsubscriber.sgml
```
    <para>
     Check
     Write recovery parameters into the target data...
```

Not sure, but "Check" seems not needed.

[1]: 
https://www.postgresql.org/message-id/b9aa614c-84ba-a869-582f-8d5e3ab57424%40enterprisedb.com
[2]: 
https://www.postgresql.org/message-id/9fd3018d-0e5f-4507-aee6-efabfb5a4440%40app.fastmail.com
[3]: 
https://www.postgresql.org/message-id/CAA4eK1L%2BE-bdKaOMSw-yWizcuprKMyeejyOwWjq_57%3DUqh-f%2Bg%40mail.gmail.com
[4]: 
https://www.postgresql.org/message-id/TYCPR01MB12077B63D81B49E9DFD323661F55A2%40TYCPR01MB12077.jpnprd01.prod.outlook.com
[5]: 
https://www.postgresql.org/docs/devel/pgupgrade.html#:~:text=options%20to%20be%20passed%20directly%20to%20the%20old%20postgres%20command%3B%20multiple%20option%20invocations%20are%20appended
[6]: 
https://www.postgresql.org/message-id/CAHv8Rj%2B5mzK9Jt%2B7ECogJzfm5czvDCCd5jO1_rCx0bTEYpBE5g%40mail.gmail.com
[7]: 
https://www.postgresql.org/message-id/OS3PR01MB98828B15DD9502C91E0C50D7F57D2%40OS3PR01MB9882.jpnprd01.prod.outlook.com
[8]: 
https://www.postgresql.org/message-id/be92c57b-82e1-4920-ac31-a8a04206db7b%40app.fastmail.com

Best Regards,
Hayato Kuroda
FUJITSU LIMITED
https://www.fujitsu.com/global/ 

From 5866926dd581881af6b75c41e858125f9427b4e6 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Date: Wed, 6 Mar 2024 06:58:48 +0000
Subject: [PATCH] Shorten main function

---
 src/bin/pg_basebackup/pg_createsubscriber.c | 516 +++++++++++---------
 1 file changed, 281 insertions(+), 235 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c 
b/src/bin/pg_basebackup/pg_createsubscriber.c
index e70fc5dca0..80d76a78ce 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -70,8 +70,7 @@ static PGconn *connect_database(const char *conninfo, bool 
exit_on_error);
 static void disconnect_database(PGconn *conn, bool exit_on_error);
 static uint64 get_primary_sysid(const char *conninfo);
 static uint64 get_standby_sysid(const char *datadir);
-static void modify_subscriber_sysid(const char *pg_resetwal_path,
-                                                                       struct 
CreateSubscriberOptions *opt);
+static void modify_subscriber_sysid(struct CreateSubscriberOptions *opt);
 static bool server_is_in_recovery(PGconn *conn);
 static void check_publisher(struct LogicalRepInfo *dbinfo);
 static void setup_publisher(struct LogicalRepInfo *dbinfo);
@@ -86,10 +85,12 @@ static void drop_replication_slot(PGconn *conn, struct 
LogicalRepInfo *dbinfo,
 static char *setup_server_logfile(const char *datadir);
 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
 static void start_standby_server(struct CreateSubscriberOptions *opt,
-                                                                const char 
*pg_ctl_path, const char *logfile,
+                                                                const char 
*logfile,
                                                                 bool 
with_options);
-static void stop_standby_server(const char *pg_ctl_path, const char *datadir);
-static void wait_for_end_recovery(const char *conninfo, const char 
*pg_ctl_path,
+static void stop_standby_server(const char *datadir);
+static void restart_server(struct CreateSubscriberOptions *options,
+                                                  const char *logfile)
+static void wait_for_end_recovery(const char *conninfo,
                                                                  struct 
CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -97,11 +98,20 @@ static void create_subscription(PGconn *conn, struct 
LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, struct LogicalRepInfo 
*dbinfo,
                                                                         const 
char *lsn);
 static void enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void parse_command_option(int argc, char **argv,
+                                                                struct 
CreateSubscriberOptions *options);
+static void verification_phase(struct CreateSubscriberOptions *options);
+static char *catchup_phase(struct CreateSubscriberOptions *options,
+                                                  char *server_start_log);
+static void cleanup_phase(struct CreateSubscriberOptions *options,
+                                                 char *server_start_log);
 
 #define        USEC_PER_SEC    1000000
 #define        WAIT_INTERVAL   1               /* 1 second */
 
 static const char *progname;
+static const char *pg_ctl_path;
+static const char *pg_resetwal_path;
 
 static char *primary_slot_name = NULL;
 static bool dry_run = false;
@@ -521,7 +531,7 @@ get_standby_sysid(const char *datadir)
  * files from one of the systems might be used in the other one.
  */
 static void
-modify_subscriber_sysid(const char *pg_resetwal_path, struct 
CreateSubscriberOptions *opt)
+modify_subscriber_sysid(struct CreateSubscriberOptions *opt)
 {
        ControlFileData *cf;
        bool            crc_ok;
@@ -1163,8 +1173,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
 }
 
 static void
-start_standby_server(struct CreateSubscriberOptions *opt, const char 
*pg_ctl_path,
-                                        const char *logfile, bool with_options)
+start_standby_server(struct CreateSubscriberOptions *opt, const char *logfile,
+                                        bool with_options)
 {
        PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
        char            socket_string[MAXPGPATH + 200];
@@ -1210,7 +1220,7 @@ start_standby_server(struct CreateSubscriberOptions *opt, 
const char *pg_ctl_pat
 }
 
 static void
-stop_standby_server(const char *pg_ctl_path, const char *datadir)
+stop_standby_server(const char *datadir)
 {
        char       *pg_ctl_cmd;
        int                     rc;
@@ -1223,6 +1233,25 @@ stop_standby_server(const char *pg_ctl_path, const char 
*datadir)
        pg_log_info("server was stopped");
 }
 
+/*
+ * Wrapper for stop_standby_server() and start_standby_server() 
+ */
+static void
+restart_server(struct CreateSubscriberOptions *options, const char *logfile)
+{
+       struct stat statbuf;
+       char            pidfile[MAXPGPATH];
+
+       /* Subscriber PID file */
+       snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", 
options->subscriber_dir);
+
+       /* If the standby server is running, stop it */
+       if (stat(pidfile, &statbuf) == 0)
+               stop_standby_server(options->subscriber_dir);
+
+       start_standby_server(options, logfile, true);
+}
+
 /*
  * Returns after the server finishes the recovery process.
  *
@@ -1230,7 +1259,7 @@ stop_standby_server(const char *pg_ctl_path, const char 
*datadir)
  * the recovery process. By default, it waits forever.
  */
 static void
-wait_for_end_recovery(const char *conninfo, const char *pg_ctl_path,
+wait_for_end_recovery(const char *conninfo,
                                          struct CreateSubscriberOptions *opt)
 {
        PGconn     *conn;
@@ -1272,7 +1301,7 @@ wait_for_end_recovery(const char *conninfo, const char 
*pg_ctl_path,
                {
                        if (++count > NUM_CONN_ATTEMPTS)
                        {
-                               stop_standby_server(pg_ctl_path, 
opt->subscriber_dir);
+                               stop_standby_server(opt->subscriber_dir);
                                pg_log_error("standby server disconnected from 
the primary");
                                break;
                        }
@@ -1285,7 +1314,7 @@ wait_for_end_recovery(const char *conninfo, const char 
*pg_ctl_path,
                /* Bail out after recovery_timeout seconds if this option is 
set */
                if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
                {
-                       stop_standby_server(pg_ctl_path, opt->subscriber_dir);
+                       stop_standby_server(opt->subscriber_dir);
                        pg_log_error("recovery timed out");
                        disconnect_database(conn, true);
                }
@@ -1581,165 +1610,20 @@ enable_subscription(PGconn *conn, struct 
LogicalRepInfo *dbinfo)
        destroyPQExpBuffer(str);
 }
 
-int
-main(int argc, char **argv)
+/*
+ * Verify the input arguments are appropriate.
+ */
+static void
+verify_input_arguments(struct CreateSubscriberOptions *options)
 {
-       static struct option long_options[] =
-       {
-               {"database", required_argument, NULL, 'd'},
-               {"pgdata", required_argument, NULL, 'D'},
-               {"dry-run", no_argument, NULL, 'n'},
-               {"subscriber-port", required_argument, NULL, 'p'},
-               {"publisher-server", required_argument, NULL, 'P'},
-               {"retain", no_argument, NULL, 'r'},
-               {"socket-directory", required_argument, NULL, 's'},
-               {"recovery-timeout", required_argument, NULL, 't'},
-               {"subscriber-username", required_argument, NULL, 'U'},
-               {"verbose", no_argument, NULL, 'v'},
-               {"version", no_argument, NULL, 'V'},
-               {"help", no_argument, NULL, '?'},
-               {NULL, 0, NULL, 0}
-       };
-
-       struct CreateSubscriberOptions opt = {0};
-
-       int                     c;
-       int                     option_index;
-
-       char       *pg_ctl_path = NULL;
-       char       *pg_resetwal_path = NULL;
-
-       char       *server_start_log;
-
-       char       *pub_base_conninfo = NULL;
-       char       *sub_base_conninfo = NULL;
        char       *dbname_conninfo = NULL;
-
-       uint64          pub_sysid;
-       uint64          sub_sysid;
-       struct stat statbuf;
-
-       PGconn     *conn;
-       char       *consistent_lsn;
-
-       PQExpBuffer sub_conninfo_str = createPQExpBuffer();
-       PQExpBuffer recoveryconfcontents = NULL;
-
-       char            pidfile[MAXPGPATH];
-
-       pg_logging_init(argv[0]);
-       pg_logging_set_level(PG_LOG_WARNING);
-       progname = get_progname(argv[0]);
-       set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
-
-       if (argc > 1)
-       {
-               if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 
0)
-               {
-                       usage();
-                       exit(0);
-               }
-               else if (strcmp(argv[1], "-V") == 0
-                                || strcmp(argv[1], "--version") == 0)
-               {
-                       puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
-                       exit(0);
-               }
-       }
-
-       /* Default settings */
-       opt.subscriber_dir = NULL;
-       opt.pub_conninfo_str = NULL;
-       opt.socket_dir = NULL;
-       opt.sub_port = DEFAULT_SUB_PORT;
-       opt.sub_username = NULL;
-       opt.database_names = (SimpleStringList)
-       {
-               NULL, NULL
-       };
-       opt.retain = false;
-       opt.recovery_timeout = 0;
-
-       /*
-        * Don't allow it to be run as root. It uses pg_ctl which does not allow
-        * it either.
-        */
-#ifndef WIN32
-       if (geteuid() == 0)
-       {
-               pg_log_error("cannot be executed by \"root\"");
-               pg_log_error_hint("You must run %s as the PostgreSQL 
superuser.",
-                                                 progname);
-               exit(1);
-       }
-#endif
-
-       get_restricted_token();
-
-       while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v",
-                                                       long_options, 
&option_index)) != -1)
-       {
-               switch (c)
-               {
-                       case 'd':
-                               /* Ignore duplicated database names */
-                               if 
(!simple_string_list_member(&opt.database_names, optarg))
-                               {
-                                       
simple_string_list_append(&opt.database_names, optarg);
-                                       num_dbs++;
-                               }
-                               break;
-                       case 'D':
-                               opt.subscriber_dir = pg_strdup(optarg);
-                               canonicalize_path(opt.subscriber_dir);
-                               break;
-                       case 'n':
-                               dry_run = true;
-                               break;
-                       case 'p':
-                               if ((opt.sub_port = atoi(optarg)) <= 0)
-                                       pg_fatal("invalid subscriber port 
number");
-                               break;
-                       case 'P':
-                               opt.pub_conninfo_str = pg_strdup(optarg);
-                               break;
-                       case 'r':
-                               opt.retain = true;
-                               break;
-                       case 's':
-                               opt.socket_dir = pg_strdup(optarg);
-                               break;
-                       case 't':
-                               opt.recovery_timeout = atoi(optarg);
-                               break;
-                       case 'U':
-                               opt.sub_username = pg_strdup(optarg);
-                               break;
-                       case 'v':
-                               pg_logging_increase_verbosity();
-                               break;
-                       default:
-                               /* getopt_long already emitted a complaint */
-                               pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
-                               exit(1);
-               }
-       }
-
-       /*
-        * Any non-option arguments?
-        */
-       if (optind < argc)
-       {
-               pg_log_error("too many command-line arguments (first is 
\"%s\")",
-                                        argv[optind]);
-               pg_log_error_hint("Try \"%s --help\" for more information.", 
progname);
-               exit(1);
-       }
+       char       *pub_base_conninfo;
+       PQExpBuffer     sub_conninfo_str = createPQExpBuffer();
 
        /*
         * Required arguments
         */
-       if (opt.subscriber_dir == NULL)
+       if (options->subscriber_dir == NULL)
        {
                pg_log_error("no subscriber data directory specified");
                pg_log_error_hint("Try \"%s --help\" for more information.", 
progname);
@@ -1749,14 +1633,14 @@ main(int argc, char **argv)
        /*
         * If socket directory is not provided, use the current directory.
         */
-       if (opt.socket_dir == NULL)
+       if (options->socket_dir == NULL)
        {
                char            cwd[MAXPGPATH];
 
                if (!getcwd(cwd, MAXPGPATH))
                        pg_fatal("could not determine current directory");
-               opt.socket_dir = pg_strdup(cwd);
-               canonicalize_path(opt.socket_dir);
+               options->socket_dir = pg_strdup(cwd);
+               canonicalize_path(options->socket_dir);
        }
 
        /*
@@ -1765,17 +1649,17 @@ main(int argc, char **argv)
         * variable sets it. If not, obtain the operating system name of the 
user
         * running it.
         */
-       if (opt.sub_username == NULL)
+       if (options->sub_username == NULL)
        {
                char       *errstr = NULL;
 
                if (getenv("PGUSER"))
                {
-                       opt.sub_username = getenv("PGUSER");
+                       options->sub_username = getenv("PGUSER");
                }
                else
                {
-                       opt.sub_username = get_user_name(&errstr);
+                       options->sub_username = get_user_name(&errstr);
                        if (errstr)
                                pg_fatal("%s", errstr);
                }
@@ -1785,7 +1669,7 @@ main(int argc, char **argv)
         * Parse connection string. Build a base connection string that might be
         * reused by multiple databases.
         */
-       if (opt.pub_conninfo_str == NULL)
+       if (options->pub_conninfo_str == NULL)
        {
                /*
                 * TODO use primary_conninfo (if available) from subscriber and
@@ -1798,19 +1682,16 @@ main(int argc, char **argv)
                exit(1);
        }
        pg_log_info("validating connection string on publisher");
-       pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
+       pub_base_conninfo = get_base_conninfo(options->pub_conninfo_str,
                                                                                
  &dbname_conninfo);
        if (pub_base_conninfo == NULL)
                exit(1);
 
        pg_log_info("validating connection string on subscriber");
        appendPQExpBuffer(sub_conninfo_str, "host=%s port=%u user=%s 
fallback_application_name=%s",
-                                         opt.socket_dir, opt.sub_port, 
opt.sub_username, progname);
-       sub_base_conninfo = get_base_conninfo(sub_conninfo_str->data, NULL);
-       if (sub_base_conninfo == NULL)
-               exit(1);
+                                         options->socket_dir, 
options->sub_port, options->sub_username, progname);
 
-       if (opt.database_names.head == NULL)
+       if (options->database_names.head == NULL)
        {
                pg_log_info("no database was specified");
 
@@ -1821,7 +1702,7 @@ main(int argc, char **argv)
                 */
                if (dbname_conninfo)
                {
-                       simple_string_list_append(&opt.database_names, 
dbname_conninfo);
+                       simple_string_list_append(&options->database_names, 
dbname_conninfo);
                        num_dbs++;
 
                        pg_log_info("database \"%s\" was extracted from the 
publisher connection string",
@@ -1836,58 +1717,134 @@ main(int argc, char **argv)
                }
        }
 
-       /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
-       pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
-       pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
-
        /* Rudimentary check for a data directory */
-       check_data_directory(opt.subscriber_dir);
+       check_data_directory(options->subscriber_dir);
 
        /*
         * Store database information for publisher and subscriber. It should be
         * called before atexit() because its return is used in the
         * cleanup_objects_atexit().
         */
-       dbinfo = store_pub_sub_info(opt.database_names, pub_base_conninfo,
-                                                               
sub_base_conninfo);
+       dbinfo = store_pub_sub_info(options->database_names, pub_base_conninfo,
+                                                               
sub_conninfo_str->data);
 
-       /* Register a function to clean up objects in case of failure */
-       atexit(cleanup_objects_atexit);
+       pfree(dbname_conninfo);
+       pfree(pub_base_conninfo);
+       destroyPQExpBuffer(sub_conninfo_str);
+}
 
-       /*
-        * Check if the subscriber data directory has the same system identifier
-        * than the publisher data directory.
-        */
-       pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
-       sub_sysid = get_standby_sysid(opt.subscriber_dir);
-       if (pub_sysid != sub_sysid)
-               pg_fatal("subscriber data directory is not a copy of the source 
database cluster");
+/*
+ * Parse command-line options and store into CreateSubscriberOptions.
+ */
+static void
+parse_command_option(int argc, char **argv, struct CreateSubscriberOptions 
*options)
+{
+       static struct option long_options[] =
+       {
+               {"database", required_argument, NULL, 'd'},
+               {"pgdata", required_argument, NULL, 'D'},
+               {"dry-run", no_argument, NULL, 'n'},
+               {"subscriber-port", required_argument, NULL, 'p'},
+               {"publisher-server", required_argument, NULL, 'P'},
+               {"retain", no_argument, NULL, 'r'},
+               {"socket-directory", required_argument, NULL, 's'},
+               {"recovery-timeout", required_argument, NULL, 't'},
+               {"subscriber-username", required_argument, NULL, 'U'},
+               {"verbose", no_argument, NULL, 'v'},
+               {"version", no_argument, NULL, 'V'},
+               {"help", no_argument, NULL, '?'},
+               {NULL, 0, NULL, 0}
+       };
 
-       /* Create the output directory to store any data generated by this tool 
*/
-       server_start_log = setup_server_logfile(opt.subscriber_dir);
+       int             c;
+       int             option_index;
 
-       /* Subscriber PID file */
-       snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", opt.subscriber_dir);
+       get_restricted_token();
+
+       while ((c = getopt_long(argc, argv, "d:D:nP:rS:t:v",
+                                                       long_options, 
&option_index)) != -1)
+       {
+               switch (c)
+               {
+                       case 'd':
+                               /* Ignore duplicated database names */
+                               if 
(!simple_string_list_member(&options->database_names, optarg))
+                               {
+                                       
simple_string_list_append(&options->database_names, optarg);
+                                       num_dbs++;
+                               }
+                               break;
+                       case 'D':
+                               options->subscriber_dir = pg_strdup(optarg);
+                               canonicalize_path(options->subscriber_dir);
+                               break;
+                       case 'n':
+                               dry_run = true;
+                               break;
+                       case 'p':
+                               if ((options->sub_port = atoi(optarg)) <= 0)
+                                       pg_fatal("invalid subscriber port 
number");
+                               break;
+                       case 'P':
+                               options->pub_conninfo_str = pg_strdup(optarg);
+                               break;
+                       case 'r':
+                               options->retain = true;
+                               break;
+                       case 's':
+                               options->socket_dir = pg_strdup(optarg);
+                               break;
+                       case 't':
+                               options->recovery_timeout = atoi(optarg);
+                               break;
+                       case 'U':
+                               options->sub_username = pg_strdup(optarg);
+                               break;
+                       case 'v':
+                               pg_logging_increase_verbosity();
+                               break;
+                       default:
+                               /* getopt_long already emitted a complaint */
+                               pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
+                               exit(1);
+               }
+       }
 
        /*
-        * If the standby server is running, stop it. Some parameters (that can
-        * only be set at server start) are informed by command-line options.
+        * Any non-option arguments?
         */
-       if (stat(pidfile, &statbuf) == 0)
+       if (optind < argc)
        {
-
-               pg_log_info("standby is up and running");
-               pg_log_info("stopping the server to start the transformation 
steps");
-               stop_standby_server(pg_ctl_path, opt.subscriber_dir);
+               pg_log_error("too many command-line arguments (first is 
\"%s\")",
+                                        argv[optind]);
+               pg_log_error_hint("Try \"%s --help\" for more information.", 
progname);
+               exit(1);
        }
 
+       verify_input_arguments(options);
+
+       /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
+       pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
+       pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
+}
+
+/*
+ * Check whether nodes can be a logical replication cluster
+ */
+static void
+verification_phase(struct CreateSubscriberOptions *options)
+{
+       uint64 pub_sysid;
+       uint64 sub_sysid;
+
        /*
-        * Start a short-lived standby server with temporary parameters 
(provided
-        * by command-line options). The goal is to avoid connections during the
-        * transformation steps.
+        * Check if the subscriber data directory has the same system identifier
+        * than the publisher data directory.
         */
-       pg_log_info("starting the standby with command-line options");
-       start_standby_server(&opt, pg_ctl_path, server_start_log, true);
+       pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
+       sub_sysid = get_standby_sysid(options->subscriber_dir);
+       if (pub_sysid != sub_sysid)
+               pg_fatal("subscriber data directory is not a copy of the source 
database cluster");
 
        /* Check if the standby server is ready for logical replication */
        check_subscriber(dbinfo);
@@ -1899,14 +1856,17 @@ main(int argc, char **argv)
         * called after it.
         */
        check_publisher(dbinfo);
+}
 
-       /*
-        * Create the required objects for each database on publisher. This step
-        * is here mainly because if we stop the standby we cannot verify if the
-        * primary slot is in use. We could use an extra connection for it but 
it
-        * doesn't seem worth.
-        */
-       setup_publisher(dbinfo);
+/*
+ * Ensure the target server is caught up to the primary
+ */
+static char *
+catchup_phase(struct CreateSubscriberOptions *options, char *server_start_log)
+{
+       PGconn     *conn;
+       char       *consistent_lsn;
+       PQExpBuffer recoveryconfcontents = NULL;
 
        /*
         * Create a temporary logical replication slot to get a consistent LSN.
@@ -1959,7 +1919,7 @@ main(int argc, char **argv)
        {
                appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = 
'%s'\n",
                                                  consistent_lsn);
-               WriteRecoveryConfig(conn, opt.subscriber_dir, 
recoveryconfcontents);
+               WriteRecoveryConfig(conn, options->subscriber_dir, 
recoveryconfcontents);
        }
        disconnect_database(conn, false);
 
@@ -1970,20 +1930,18 @@ main(int argc, char **argv)
         * until accepting connections.
         */
        pg_log_info("stopping and starting the subscriber");
-       stop_standby_server(pg_ctl_path, opt.subscriber_dir);
-       start_standby_server(&opt, pg_ctl_path, server_start_log, true);
+       restart_server(options, server_start_log);
 
        /* Waiting the subscriber to be promoted */
-       wait_for_end_recovery(dbinfo[0].subconninfo, pg_ctl_path, &opt);
+       wait_for_end_recovery(dbinfo[0].subconninfo, options);
 
-       /*
-        * Create the subscription for each database on subscriber. It does not
-        * enable it immediately because it needs to adjust the logical
-        * replication start point to the LSN reported by consistent_lsn (see
-        * set_replication_progress). It also cleans up publications created by
-        * this tool and replication to the standby.
-        */
-       setup_subscriber(dbinfo, consistent_lsn);
+       return consistent_lsn;
+}
+
+static void
+cleanup_phase(struct CreateSubscriberOptions *options, char *server_start_log)
+{
+       PGconn     *conn;
 
        /*
         * If the primary_slot_name exists on primary, drop it.
@@ -2009,10 +1967,10 @@ main(int argc, char **argv)
 
        /* Stop the subscriber */
        pg_log_info("stopping the subscriber");
-       stop_standby_server(pg_ctl_path, opt.subscriber_dir);
+       stop_standby_server(options->subscriber_dir);
 
        /* Change system identifier from subscriber */
-       modify_subscriber_sysid(pg_resetwal_path, &opt);
+       modify_subscriber_sysid(options);
 
        /*
         * In dry run mode, the server is restarted with the provided 
command-line
@@ -2021,14 +1979,102 @@ main(int argc, char **argv)
         * the command-line options.
         */
        if (dry_run)
-               start_standby_server(&opt, pg_ctl_path, NULL, false);
+               start_standby_server(options, NULL, false);
 
        /*
         * The log file is kept if retain option is specified or this tool does
         * not run successfully. Otherwise, log file is removed.
         */
-       if (!dry_run && !opt.retain)
+       if (!dry_run && !options->retain)
                unlink(server_start_log);
+}
+
+int
+main(int argc, char **argv)
+{
+       struct  CreateSubscriberOptions opt = {0};
+       char   *server_start_log;
+       char   *consistent_lsn;
+
+       pg_logging_init(argv[0]);
+       pg_logging_set_level(PG_LOG_WARNING);
+       progname = get_progname(argv[0]);
+       set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
+
+       if (argc > 1)
+       {
+               if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 
0)
+               {
+                       usage();
+                       exit(0);
+               }
+               else if (strcmp(argv[1], "-V") == 0
+                                || strcmp(argv[1], "--version") == 0)
+               {
+                       puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
+                       exit(0);
+               }
+       }
+
+       /* Default settings */
+       opt.subscriber_dir = NULL;
+       opt.pub_conninfo_str = NULL;
+       opt.socket_dir = NULL;
+       opt.sub_port = DEFAULT_SUB_PORT;
+       opt.sub_username = NULL;
+       opt.database_names = (SimpleStringList)
+       {
+               NULL, NULL
+       };
+       opt.retain = false;
+       opt.recovery_timeout = 0;
+
+       /*
+        * Don't allow it to be run as root. It uses pg_ctl which does not allow
+        * it either.
+        */
+#ifndef WIN32
+       if (geteuid() == 0)
+       {
+               pg_log_error("cannot be executed by \"root\"");
+               pg_log_error_hint("You must run %s as the PostgreSQL 
superuser.",
+                                                 progname);
+               exit(1);
+       }
+#endif
+
+       parse_command_option(argc, argv, &opt);
+
+       /* Create the output directory to store any data generated by this tool 
*/
+       server_start_log = setup_server_logfile(opt.subscriber_dir);
+
+       restart_server(&opt, server_start_log);
+
+       verification_phase(&opt);
+
+       /* Register a function to clean up objects in case of failure */
+       atexit(cleanup_objects_atexit);
+
+       /*
+        * Create the required objects for each database on publisher. This step
+        * is here mainly because if we stop the standby we cannot verify if the
+        * primary slot is in use. We could use an extra connection for it but 
it
+        * doesn't seem worth.
+        */
+       setup_publisher(dbinfo);
+
+       consistent_lsn = catchup_phase(&opt, server_start_log);
+
+       /*
+        * Create the subscription for each database on subscriber. It does not
+        * enable it immediately because it needs to adjust the logical
+        * replication start point to the LSN reported by consistent_lsn (see
+        * set_replication_progress). It also cleans up publications created by
+        * this tool and replication to the standby.
+        */
+       setup_subscriber(dbinfo, consistent_lsn);
+
+       cleanup_phase(&opt, server_start_log);
 
        success = true;
 
-- 
2.43.0

Reply via email to