On 12/02/2013 02:23 PM, Boszormenyi Zoltan wrote:
> Hi,
> 
> I am reviewing your patch.

Thanks. New version attached.

> 
> * Does it follow the project coding guidelines?
> 
> Yes. A nitpicking: this else branch below might need brackets
> because there is also a comment in that branch:
> 
> +                       /* The 'real data' starts now (header was ignored). */
> +                       throttled_last = GetCurrentIntegerTimestamp();
> +               }
> +               else
> +                       /* Disable throttling. */
> +                       throttling_counter = -1;
> +

Done.

> 
> * Does it do what it says, correctly?
> 
> Yes.
> 
> Although it should be mentioned in the docs that rate limiting
> applies to walsenders individually, not globally. I tried this
> on a freshly created database:
> 
> $ time pg_basebackup -D data2 -r 1M -X stream -h 127.0.0.1
> real    0m26.508s
> user    0m0.054s
> sys    0m0.360s
> 
> The source database had 3 WAL files in pg_xlog, one of them was
> also streamed. The final size of "data2" was 43MB or 26MB without pg_xlog,
> i.e. without the "-X stream" option. The backup used 2 walsenders
> in parallel (one for WAL) which is a known feature.

Right, if the method is 'stream', a separate WAL sender is used and the
transfer is not limited: client must keep up with the stream
unconditionally. I added a note to documentation.

But there's no reason not to throttle WAL files if the method is
'fetch'. It's fixed now.

> Another note. This chunk should be submitted separately as a comment bugfix:
> 
> diff --git a/src/backend/utils/adt/timestamp.c 
> b/src/backend/utils/adt/timestamp.c
> index c3c71b7..5736fd8 100644
> --- a/src/backend/utils/adt/timestamp.c
> +++ b/src/backend/utils/adt/timestamp.c
> @@ -1288,7 +1288,7 @@ GetCurrentTimestamp(void)
>   /*
>    * GetCurrentIntegerTimestamp -- get the current operating system time as 
> int64
>    *
> - * Result is the number of milliseconds since the Postgres epoch. If compiled
> + * Result is the number of microseconds since the Postgres epoch. If compiled
>    * with --enable-integer-datetimes, this is identical to 
> GetCurrentTimestamp(),
>    * and is implemented as a macro.
>    */

Will do.

// Tony

diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index c379df5..e878592 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -189,6 +189,26 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-r</option></term>
+      <term><option>--max-rate</option></term>
+      <listitem>
+       <para>
+        The maximum amount of data transferred from server per second.
+        The purpose is to limit impact of <application>pg_basebackup</application>
+        on a running master server.
+       </para>
+       <para>
+        This option always affects transfer of the data directory. Transfer of
+        WAL files is only affected if the collection method is <literal>fetch</literal>.
+       </para>
+       <para>
+        Suffixes <literal>k</literal> (kilobytes) and <literal>M</literal>
+        (megabytes) are accepted. For example: <literal>10M</literal>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-R</option></term>
       <term><option>--write-recovery-conf</option></term>
       <listitem>
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index ba8d173..f194746 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -33,6 +33,7 @@
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
+#include "utils/timestamp.h"
 #include "pgtar.h"
 
 typedef struct
@@ -42,6 +43,7 @@ typedef struct
 	bool		fastcheckpoint;
 	bool		nowait;
 	bool		includewal;
+	uint32		maxrate;
 } basebackup_options;
 
 
@@ -59,6 +61,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int	compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
 
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
@@ -68,6 +71,41 @@ static bool backup_started_in_recovery = false;
  */
 #define TAR_SEND_SIZE 32768
 
+
+/*
+ * The maximum amount of data per second - bounds of the user input.
+ *
+ * If the maximum should be increased to more than 4 GB, uint64 must
+ * be introduced for the related variables. However such high values have
+ * little to do with throttling.
+ */
+#define MAX_RATE_LOWER	32768
+#define MAX_RATE_UPPER	(1024 << 20)
+
+/*
+ * Transfer rate is only measured when this number of bytes has been sent.
+ * (Too frequent checks would impose too high CPU overhead.)
+ *
+ * The default value is used unless it'd result in too frequent checks.
+ */
+#define THROTTLING_SAMPLE_MIN	32768
+
+/* The maximum number of checks per second.  */
+#define THROTTLING_MAX_FREQUENCY	128
+
+/* The actual value, transfer of which may cause sleep. */
+static uint32 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled.  */
+static int32 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
+
 typedef struct
 {
 	char	   *oid;
@@ -171,6 +209,33 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 		/* Send tablespace header */
 		SendBackupHeader(tablespaces);
 
+		if (opt->maxrate > 0)
+		{
+			throttling_sample = opt->maxrate / THROTTLING_MAX_FREQUENCY;
+
+			/* Don't measure too small pieces of data. */
+			if (throttling_sample < THROTTLING_SAMPLE_MIN)
+				throttling_sample = THROTTLING_SAMPLE_MIN;
+
+			/*
+			 * opt->maxrate is bytes per seconds. Thus the expression in
+			 * brackets is microseconds per byte.
+			 */
+			elapsed_min_unit = throttling_sample *
+				((double) USECS_PER_SEC / opt->maxrate);
+
+			/* Enable throttling. */
+			throttling_counter = 0;
+
+			/* The 'real data' starts now (header was ignored). */
+			throttled_last = GetCurrentIntegerTimestamp();
+		}
+		else
+		{
+			/* Disable throttling. */
+			throttling_counter = -1;
+		}
+
 		/* Send off our tablespaces one by one */
 		foreach(lc, tablespaces)
 		{
@@ -398,6 +463,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 							(errmsg("base backup could not send data, aborting backup")));
 
 				len += cnt;
+				throttle(cnt);
+
 				if (len == XLogSegSize)
 					break;
 			}
@@ -468,6 +535,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_fast = false;
 	bool		o_nowait = false;
 	bool		o_wal = false;
+	bool		o_maxrate = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -519,6 +587,29 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			opt->includewal = true;
 			o_wal = true;
 		}
+		else if (strcmp(defel->defname, "maxrate") == 0)
+		{
+			long		maxrate;
+
+			if (o_maxrate)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			maxrate = intVal(defel->arg);
+
+			opt->maxrate = (uint32) maxrate;
+			if (opt->maxrate > 0 &&
+			(opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER))
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				  errmsg("transfer rate %u bytes per second is out of range",
+						 opt->maxrate),
+				 errhint("The accepted range is %u through %u kB per second",
+						 MAX_RATE_LOWER >> 10, MAX_RATE_UPPER >> 10)));
+			}
+			o_maxrate = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -1019,6 +1110,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
 			   (errmsg("base backup could not send data, aborting backup")));
 
 		len += cnt;
+		throttle(cnt);
 
 		if (len >= statbuf->st_size)
 		{
@@ -1040,10 +1132,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
 			cnt = Min(sizeof(buf), statbuf->st_size - len);
 			pq_putmessage('d', buf, cnt);
 			len += cnt;
+			throttle(cnt);
 		}
 	}
 
-	/* Pad to 512 byte boundary, per tar format requirements */
+	/*
+	 * Pad to 512 byte boundary, per tar format requirements. (This small
+	 * piece of data is probably not worth throttling.)
+	 */
 	pad = ((len + 511) & ~511) - len;
 	if (pad > 0)
 	{
@@ -1069,3 +1165,51 @@ _tarWriteHeader(const char *filename, const char *linktarget,
 
 	pq_putmessage('d', h, 512);
 }
+
+static void
+throttle(size_t increment)
+{
+	int64		elapsed,
+				elapsed_min,
+				sleep;
+
+	if (throttling_counter < 0)
+		return;
+
+	throttling_counter += increment;
+	if (throttling_counter < throttling_sample)
+		return;
+
+	/* Time elapsed since the last measuring (and possible wake up). */
+	elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+	/* How much should have elapsed at minimum? */
+	elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+	sleep = elapsed_min - elapsed;
+	/* Only sleep if the transfer is faster than it should be. */
+	if (sleep > 0)
+	{
+
+		/*
+		 * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the
+		 * longest possible time to sleep. Thus the cast to long is safe.
+		 */
+		pg_usleep((long) sleep);
+	}
+	else
+	{
+
+		/*
+		 * The actual transfer rate is below the limit. Negative value would
+		 * distort the adjustment of throttled_last.
+		 */
+		sleep = 0;
+	}
+
+	/*
+	 * Only the whole multiples of throttling_sample processed. The rest will
+	 * be done during the next call of this function.
+	 */
+	throttling_counter %= throttling_sample;
+	/* Once the (possible) sleep ends, new period starts. */
+	throttled_last += elapsed + sleep;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 8c83780..1c2c31c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -78,6 +78,7 @@ Node *replication_parse_result;
 %token K_PROGRESS
 %token K_FAST
 %token K_NOWAIT
+%token K_MAX_RATE
 %token K_WAL
 %token K_TIMELINE
 
@@ -116,7 +117,7 @@ identify_system:
 			;
 
 /*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d]
  */
 base_backup:
 			K_BASE_BACKUP base_backup_opt_list
@@ -156,6 +157,11 @@ base_backup_opt:
 				  $$ = makeDefElem("nowait",
 						   (Node *)makeInteger(TRUE));
 				}
+			| K_MAX_RATE UCONST
+				{
+				  $$ = makeDefElem("maxrate",
+						   (Node *)makeInteger($2));
+				}
 			;
 
 /*
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 3d930f1..b2d5e3b 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -71,6 +71,7 @@ IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
 PROGRESS			{ return K_PROGRESS; }
+MAX_RATE		{ return K_MAX_RATE; }
 WAL			{ return K_WAL; }
 TIMELINE			{ return K_TIMELINE; }
 START_REPLICATION	{ return K_START_REPLICATION; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 6706c0c..7a86cd7 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -46,6 +46,7 @@ bool		streamwal = false;
 bool		fastcheckpoint = false;
 bool		writerecoveryconf = false;
 int			standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+uint32		maxrate = 0;		/* No limit by default. */
 
 /* Progress counters */
 static uint64 totalsize;
@@ -76,6 +77,7 @@ static PQExpBuffer recoveryconfcontents = NULL;
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, const char *filename);
+static uint32 parse_max_rate(char *src);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
@@ -111,6 +113,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=DIRECTORY receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t       output format (plain (default), tar)\n"));
+	printf(_("  -r, --max-rate         maximum transfer rate to transfer data directory\n"));
 	printf(_("  -R, --write-recovery-conf\n"
 			 "                         write recovery.conf after backup\n"));
 	printf(_("  -x, --xlog             include required WAL files in backup (fetch mode)\n"));
@@ -476,6 +479,94 @@ progress_report(int tablespacenum, const char *filename)
 }
 
 
+static uint32
+parse_max_rate(char *src)
+{
+	int			factor;
+	char	   *after_num;
+	int64		result;
+	int			errno_copy;
+
+	result = strtol(src, &after_num, 0);
+	errno_copy = errno;
+	if (src == after_num)
+	{
+		fprintf(stderr, _("%s: transfer rate \"%s\" is not a valid integer value\n"), progname, src);
+		exit(1);
+	}
+
+
+	/*
+	 * Evaluate (optional) suffix.
+	 *
+	 * after_num should now be right behind the numeric value.
+	 */
+	factor = 1;
+	switch (*after_num)
+	{
+			/*
+			 * Only the following suffixes are allowed. It's not too useful to
+			 * restrict the rate to gigabytes: such a rate will probably bring
+			 * significant impact on the master anyway, so the throttling
+			 * won't help much.
+			 */
+		case 'M':
+			factor <<= 10;
+		case 'k':
+			factor <<= 10;
+			after_num++;
+			break;
+
+		default:
+
+			/*
+			 * If there's no suffix, byte is the unit. Possible invalid letter
+			 * will make conversion to integer fail.
+			 */
+			break;
+	}
+
+	/* The rest can only consist of white space. */
+	while (*after_num != '\0')
+	{
+		if (!isspace(*after_num))
+		{
+			fprintf(stderr, _("%s: invalid transfer rate \"%s\"\n"), progname, src);
+			exit(1);
+		}
+		after_num++;
+	}
+
+	/* Some checks only make sense when we know than the suffix is correct. */
+	if (result <= 0)
+	{
+		/*
+		 * Reject obviously wrong values here. Exact check of the range to be
+		 * done on server.
+		 */
+		fprintf(stderr, _("%s: transfer must be greater than zero\n"), progname);
+		exit(1);
+	}
+	if (errno_copy == ERANGE || result != (uint64) ((uint32) result))
+	{
+		fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+		exit(1);
+	}
+
+	if (factor > 1)
+	{
+		result *= factor;
+		/* Check the integer range once again. */
+		if (result != (uint64) ((uint32) result))
+		{
+			fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+			exit(1);
+		}
+	}
+	return (uint32) result;
+}
+
+
 /*
  * Write a piece of tar data
  */
@@ -1310,6 +1401,7 @@ BaseBackup(void)
 	uint32		starttli;
 	char		current_path[MAXPGPATH];
 	char		escaped_label[MAXPGPATH];
+	char		maxrate_clause[MAXPGPATH];
 	int			i;
 	char		xlogstart[64];
 	char		xlogend[64];
@@ -1382,13 +1474,18 @@ BaseBackup(void)
 	 * Start the actual backup
 	 */
 	PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
+	if (maxrate > 0)
+		snprintf(maxrate_clause, sizeof(maxrate_clause), "MAX_RATE %u", maxrate);
+	else
+		maxrate_clause[0] = '\0';
 	snprintf(current_path, sizeof(current_path),
-			 "BASE_BACKUP LABEL '%s' %s %s %s %s",
+			 "BASE_BACKUP LABEL '%s' %s %s %s %s %s",
 			 escaped_label,
 			 showprogress ? "PROGRESS" : "",
 			 includewal && !streamwal ? "WAL" : "",
 			 fastcheckpoint ? "FAST" : "",
-			 includewal ? "NOWAIT" : "");
+			 includewal ? "NOWAIT" : "",
+			 maxrate_clause);
 
 	if (PQsendQuery(conn, current_path) == 0)
 	{
@@ -1657,6 +1754,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
+		{"max-rate", required_argument, NULL, 'r'},
 		{"write-recovery-conf", no_argument, NULL, 'R'},
 		{"xlog", no_argument, NULL, 'x'},
 		{"xlog-method", required_argument, NULL, 'X'},
@@ -1697,7 +1795,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP",
+	while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1718,6 +1816,9 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'r':
+				maxrate = parse_max_rate(optarg);
+				break;
 			case 'R':
 				writerecoveryconf = true;
 				break;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to