*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
as possible. To avoid this behavior, use the -n
parameter.
+
+
+ Synchronous mode offers the ability to confirm WAL have been streamed
+ in the same way as synchronous replication. To use synchronous mode,
+ set up synchronous replication as described in
+ , and set parameter(that is, -m and --slot).
+
***************
*** 106,111 **** PostgreSQL documentation
--- 113,129 ----
+
+
+
+
+ Enables synchronous mode. If replication slot is disabled then
+ this setting is irrelevant.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
static char *basedir = NULL;
static int verbose = 0;
static int noloop = 0;
+ static int syncmode = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static volatile bool time_to_abort = false;
***************
*** 62,67 **** usage(void)
--- 63,69 ----
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -m, --sync-mode synchronous mode\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial");
PQfinish(conn);
}
--- 332,338 ----
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial", syncmode);
PQfinish(conn);
}
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"sync-mode", no_argument, NULL, 'm'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 392,398 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
case 'n':
noloop = 1;
break;
+ case 'm':
+ syncmode = 1;
+ break;
case 'v':
verbose++;
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
--- 34,40 ----
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix)
{
char query[128];
char slotcmd[128];
--- 417,432 ----
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+ * received WAL is written, and reply flush position.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix, int syncmode)
{
char query[128];
char slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos);
if (res == NULL)
goto error;
--- 571,577 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, syncmode);
if (res == NULL)
goto error;
***************
*** 729,740 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
bool still_sending = true;
while (1)
{
--- 732,744 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int syncmode)
{
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
bool still_sending = true;
+ int flush_flg = 0;
while (1)
{
***************
*** 787,839 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
r = PQgetCopyData(conn, ©buf, 1);
if (r == 0)
{
! /*
! * No data available. Wait for some to appear, but not longer than
! * the specified timeout, so that we can ping the server.
! */
! fd_set input_mask;
! struct timeval timeout;
! struct timeval *timeoutptr;
!
! FD_ZERO(&input_mask);
! FD_SET(PQsocket(conn), &input_mask);
! if (standby_message_timeout && still_sending)
{
! int64 targettime;
! long secs;
! int usecs;
!
! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! feTimestampDifference(now,
! targettime,
! &secs,
! &usecs);
! if (secs <= 0)
! timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! else
! timeout.tv_sec = secs;
! timeout.tv_usec = usecs;
! timeoutptr = &timeout;
}
! else
! timeoutptr = NULL;
!
! r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! if (r == 0 || (r < 0 && errno == EINTR))
{
/*
! * Got a timeout or signal. Continue the loop and either
! * deliver a status packet to the server or just go back into
! * blocking.
*/
! continue;
! }
! else if (r < 0)
! {
! fprintf(stderr, _("%s: select() failed: %s\n"),
! progname, strerror(errno));
! goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
--- 791,864 ----
r = PQgetCopyData(conn, ©buf, 1);
if (r == 0)
{
! if (flush_flg == 2)
{
! if (walfile != -1)
! {
! if (fsync(walfile) != 0)
! {
! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! progname, current_walfile_name, strerror(errno));
! goto error;
! }
! lastFlushPosition = blockpos;
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! }
! flush_flg = 0;
}
! if (flush_flg == 0 || !syncmode)
{
/*
! * No data available. Wait for some to appear, but not longer than
! * the specified timeout, so that we can ping the server.
*/
! fd_set input_mask;
! struct timeval timeout;
! struct timeval *timeoutptr;
!
! FD_ZERO(&input_mask);
! FD_SET(PQsocket(conn), &input_mask);
! if (standby_message_timeout && still_sending)
! {
! int64 targettime;
! long secs;
! int usecs;
!
! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
! feTimestampDifference(now,
! targettime,
! &secs,
! &usecs);
! if (secs <= 0)
! timeout.tv_sec = 1; /* Always sleep at least 1 sec */
! else
! timeout.tv_sec = secs;
! timeout.tv_usec = usecs;
! timeoutptr = &timeout;
! }
! else
! timeoutptr = NULL;
!
! r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! if (r == 0 || (r < 0 && errno == EINTR))
! {
! /*
! * Got a timeout or signal. Continue the loop and either
! * deliver a status packet to the server or just go back into
! * blocking.
! */
! continue;
! }
! else if (r < 0)
! {
! fprintf(stderr, _("%s: select() failed: %s\n"),
! progname, strerror(errno));
! goto error;
! }
}
+ else
+ flush_flg++;
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
***************
*** 1041,1046 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 1066,1073 ----
}
}
}
+ if(syncmode)
+ flush_flg = 1;
/* No more data left to write, receive next copy packet */
}
else
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix);
--- 16,20 ----
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix,
! int syncmode);