On Tue, Jan 19, 2010 at 12:20 AM, Heikki Linnakangas
<[email protected]> wrote:
> Tom Lane wrote:
>> Heikki Linnakangas <[email protected]> writes:
>>> Simon Riggs wrote:
>>>> Do we need a new record type for that, is there a handy record type to
>>>> bounce from?
>>
>>> After starting streaming, slices of WAL are sent as CopyData messages.
>>> The CopyData payload begins with an XLogRecPtr, followed by the WAL
>>> data. That payload format needs to be extended with a 'message type'
>>> field and a new message type for the timestamps need to be added.
>>
>> Whether or not anyone bothers with the timestamp message, I think adding
>> a message type header is a Must Fix item. A protocol with no provision
>> for extension is certainly going to bite us in the rear before long.
>
> Agreed a message type header is a good idea, although we don't expect
> streaming replication and the protocol to work across different major
> versions anyway.
The attached patch adds a message type header into the payload in
CopyData message sent from walsender to walreceiver, to make the
replication protocol more extensible.
Regards,
--
Fujii Masao
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 4179,4190 **** The commands accepted in walsender mode are:
already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData
messages.
</para>
<para>
! The payload in each CopyData message consists of an XLogRecPtr,
! indicating the starting point of the WAL in the message, immediately
! followed by the WAL data itself.
</para>
<para>
A single WAL record is never split across two CopyData messages. When
--- 4179,4243 ----
already been recycled. On success, server responds with a
CopyOutResponse message, and backend starts to stream WAL as CopyData
messages.
+ The payload in CopyData message consists of the following format.
</para>
<para>
! <variablelist>
! <varlistentry>
! <term>
! XLogData (B)
! </term>
! <listitem>
! <para>
! <variablelist>
! <varlistentry>
! <term>
! Byte1('w')
! </term>
! <listitem>
! <para>
! Identifies the message as WAL data.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Int32
! </term>
! <listitem>
! <para>
! The log file number of the LSN, indicating the starting point of
! the WAL in the message.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Int32
! </term>
! <listitem>
! <para>
! The byte offset of the LSN, indicating the starting point of
! the WAL in the message.
! </para>
! </listitem>
! </varlistentry>
! <varlistentry>
! <term>
! Byte<replaceable>n</replaceable>
! </term>
! <listitem>
! <para>
! Data that forms part of WAL data stream.
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
! </para>
! </listitem>
! </varlistentry>
! </variablelist>
</para>
<para>
A single WAL record is never split across two CopyData messages. When
*** a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
--- b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
***************
*** 48,55 **** static char *recvBuf = NULL;
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
! int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
--- 48,55 ----
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
! static bool libpqrcv_receive(int timeout, unsigned char *type,
! char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
***************
*** 236,248 **** libpqrcv_disconnect(void)
}
/*
! * Receive any WAL records available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
! * True if data was received. *recptr, *buffer and *len are set to
! * the WAL location of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
--- 236,248 ----
}
/*
! * Receive any messages available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
! * True if data was received. *type, *buffer and *len are set to
! * the type of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
***************
*** 254,260 **** libpqrcv_disconnect(void)
* ereports on error.
*/
static bool
! libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
{
int rawlen;
--- 254,260 ----
* ereports on error.
*/
static bool
! libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
int rawlen;
***************
*** 275,288 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
justconnected = false;
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! if (rawlen == 0) /* no records available yet, then return */
return false;
if (rawlen == -1) /* end-of-streaming or error */
{
--- 275,288 ----
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
justconnected = false;
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
! if (rawlen == 0) /* no data available yet, then return */
return false;
if (rawlen == -1) /* end-of-streaming or error */
{
***************
*** 297,318 **** libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
}
PQclear(res);
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
! (errmsg("could not read xlog records: %s",
PQerrorMessage(streamConn))));
! if (rawlen < sizeof(XLogRecPtr))
! ereport(ERROR,
! (errmsg("invalid WAL message received from primary")));
!
! /* Return received WAL records to caller */
! *recptr = *((XLogRecPtr *) recvBuf);
! *buffer = recvBuf + sizeof(XLogRecPtr);
! *len = rawlen - sizeof(XLogRecPtr);
return true;
}
--- 297,314 ----
}
PQclear(res);
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
! (errmsg("could not receive data from XLOG stream: %s",
PQerrorMessage(streamConn))));
! /* Return received messages to caller */
! *type = *((unsigned char *) recvBuf);
! *buffer = recvBuf + sizeof(*type);
! *len = rawlen - sizeof(*type);
return true;
}
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
***************
*** 135,140 **** static void WalRcvQuickDieHandler(SIGNAL_ARGS);
--- 135,141 ----
/* Prototypes for private functions */
static void WalRcvDie(int code, Datum arg);
+ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
***************
*** 258,264 **** WalReceiverMain(void)
/* Loop until end-of-streaming or error */
for (;;)
{
! XLogRecPtr recptr;
char *buf;
int len;
--- 259,265 ----
/* Loop until end-of-streaming or error */
for (;;)
{
! unsigned char type;
char *buf;
int len;
***************
*** 287,303 **** WalReceiverMain(void)
}
/* Wait a while for data to arrive */
! if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
{
! /* Write received WAL records to disk */
! XLogWalRcvWrite(buf, len, recptr);
! /* Receive any more WAL records we can without sleeping */
! while(walrcv_receive(0, &recptr, &buf, &len))
! XLogWalRcvWrite(buf, len, recptr);
/*
! * Now that we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
--- 288,304 ----
}
/* Wait a while for data to arrive */
! if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
! /* Accept the received data, and process it */
! XLogWalRcvProcessMsg(type, buf, len);
! /* Receive any more data we can without sleeping */
! while(walrcv_receive(0, &type, &buf, &len))
! XLogWalRcvProcessMsg(type, buf, len);
/*
! * If we've written some records, flush them to disk and
* let the startup process know about them.
*/
XLogWalRcvFlush();
***************
*** 376,381 **** WalRcvQuickDieHandler(SIGNAL_ARGS)
--- 377,412 ----
}
/*
+ * Accept the message from XLOG stream, and process it.
+ */
+ static void
+ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
+ {
+ switch (type)
+ {
+ case 'w': /* WAL records */
+ {
+ XLogRecPtr recptr;
+
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
+
+ recptr = *((XLogRecPtr *) buf);
+ buf += sizeof(XLogRecPtr);
+ len -= sizeof(XLogRecPtr);
+ XLogWalRcvWrite(buf, len, recptr);
+ break;
+ }
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication message type %d",
+ type)));
+ }
+ }
+
+ /*
* Write XLOG data to disk.
*/
static void
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 659,664 **** XLogSend(StringInfo outMsg)
--- 659,665 ----
* have the same byte order. If they have different byte order, we
* don't reach here.
*/
+ pq_sendbyte(outMsg, 'w');
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
if (endptr.xlogid != startptr.xlogid)
*** a/src/include/replication/walreceiver.h
--- b/src/include/replication/walreceiver.h
***************
*** 66,72 **** extern WalRcvData *WalRcv;
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
! typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
--- 66,73 ----
typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
! typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
! char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_disconnect_type) (void);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers