Hi,

thanks for review.

On 17/03/16 13:36, Tomas Vondra wrote:
Hi,

a few comments about the last version of the patch:


1) LogicalDecodeMessageCB

Do we actually need the 'transactional' parameter here? I mean, having
the 'txn' should be enough, as

     transactional = (txt != NULL)


Agreed. Same goes for the ReoderBufferChange struct btw, only transactional messages go there so no point in marking them as such.



2) pg_logical_emit_message_bytea / pg_logical_emit_message_text

The comment before _bytea is wrong - it's just a copy'n'paste from the
preceding function (pg_logical_slot_peek_binary_changes). _text has no
comment at all, but it's true it's  just a simple _bytea wrapper.


Heh, blind.

The main issue here however is that the functions are not defined as
strict, but ignore the possibility that the parameters might be NULL. So
for example this crashes the backend

SELECT pg_logical_emit_message(NULL::boolean, NULL::text, NULL::text);


Good point.


3) ReorderBufferQueueMessage

No comment. Not a big deal I guess, the method is simple enough, but why
to break the rule when all the other methods around have at least a
short one?


Yeah I sometimes am not sure if there is a point to put comment to tiny straightforward functions that do more or less same as the one above. But it's public API so probably better to have one.


4) ReorderBufferChange

The new struct in the 'union' would probably deserve at least a short
comment explaining the purpose (just like the other structs around).



Okay.

Updated version attached.

(BTW please try to CC author of the patch when reviewing)


--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From 4515b5b3e8f5ddb96124e859b968c43c27f79be3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile                  |  2 +-
 contrib/test_decoding/expected/ddl.out          | 21 ++++--
 contrib/test_decoding/expected/messages.out     | 56 ++++++++++++++++
 contrib/test_decoding/sql/ddl.sql               |  3 +-
 contrib/test_decoding/sql/messages.sql          | 17 +++++
 contrib/test_decoding/test_decoding.c           | 17 +++++
 doc/src/sgml/func.sgml                          | 45 +++++++++++++
 doc/src/sgml/logicaldecoding.sgml               | 34 ++++++++++
 src/backend/access/rmgrdesc/Makefile            |  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c    | 41 ++++++++++++
 src/backend/access/transam/rmgr.c               |  1 +
 src/backend/replication/logical/Makefile        |  2 +-
 src/backend/replication/logical/decode.c        | 49 ++++++++++++++
 src/backend/replication/logical/logical.c       | 37 +++++++++++
 src/backend/replication/logical/logicalfuncs.c  | 27 ++++++++
 src/backend/replication/logical/message.c       | 87 +++++++++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 68 +++++++++++++++++++
 src/backend/replication/logical/snapbuild.c     | 19 ++++++
 src/bin/pg_xlogdump/rmgrdesc.c                  |  1 +
 src/include/access/rmgrlist.h                   |  1 +
 src/include/catalog/pg_proc.h                   |  4 ++
 src/include/replication/logicalfuncs.h          |  2 +
 src/include/replication/message.h               | 41 ++++++++++++
 src/include/replication/output_plugin.h         | 12 ++++
 src/include/replication/reorderbuffer.h         | 20 ++++++
 src/include/replication/snapbuild.h             |  2 +
 26 files changed, 601 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c9546..309cb0b 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time
+	decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+    ?column?    
+----------------
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |                       min                       |                                  max                                   
--------+-------------------------------------------------+------------------------------------------------------------------------
-     1 | BEGIN                                           | BEGIN
-     1 | COMMIT                                          | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
-(3 rows)
+ count |                                  min                                  |                                  max                                   
+-------+-----------------------------------------------------------------------+------------------------------------------------------------------------
+     1 | BEGIN                                                                 | BEGIN
+     1 | COMMIT                                                                | COMMIT
+     1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg
+ 20467 | table public.tr_etoomuch: DELETE: id[integer]:1                       | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
+(4 rows)
 
 -- check updates of primary keys work correctly
 BEGIN;
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
new file mode 100644
index 0000000..7fa8256
--- /dev/null
+++ b/contrib/test_decoding/expected/messages.out
@@ -0,0 +1,56 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+ ?column? 
+----------
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+ ?column? 
+----------
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+ ?column? 
+----------
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ ?column? 
+----------
+ msg4
+(1 row)
+
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+ ?column? 
+----------
+ msg5
+(1 row)
+
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+                            data                            
+------------------------------------------------------------
+ message: transactional: 1 prefix: test, sz: 4 content:msg1
+ message: transactional: 0 prefix: test, sz: 4 content:msg2
+ message: transactional: 0 prefix: test, sz: 4 content:msg4
+ message: transactional: 1 prefix: test, sz: 4 content:msg3
+ message: transactional: 1 prefix: test, sz: 4 content:msg5
+(5 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ init
+(1 row)
+
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index ad928ad..b1f7bf6 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -108,11 +108,12 @@ DELETE FROM tr_pkey;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
new file mode 100644
index 0000000..8744eb6
--- /dev/null
+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,17 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 4cf808f..1997774 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 
 #include "utils/builtins.h"
@@ -63,6 +64,9 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							  const char *prefix, Size sz, const char *message);
 
 void
 _PG_init(void)
@@ -82,6 +86,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
+	cb->message_cb = pg_decode_message;
 }
 
 
@@ -471,3 +476,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr lsn,
+				  const char *prefix, Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
+					 txn != NULL, prefix, sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index ae93e69..02f556b 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -18238,6 +18238,51 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        </entry>
       </row>
 
+      <row>
+       <entry id="pg-logical-emit-message-text">
+        <indexterm>
+         <primary>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Write text logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the text of the message.
+       </entry>
+      </row>
+
+      <row>
+       <entry id="pg-logical-emit-message-bytea">
+        <indexterm>
+         <primary>>pg_logical_emit_message</primary>
+        </indexterm>
+        <literal><function>>pg_logical_emit_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Write binary logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written immediately
+        and decoded as soon as the logical decoding reads the record. The
+        <parameter>prefix</parameter> is textual prefix used by the logical
+        decoding plugins to easily recognize interesting messages for them.
+        The <parameter>content</parameter> is the binary content of the
+        message.
+       </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 45fdfeb..7c9d03a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
@@ -602,6 +603,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (
        more efficient.
      </para>
      </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-message">
+     <title>Generic Message Callback</title>
+
+     <para>
+      The optional <function>message_cb</function> callback is called whenever
+      a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn,
+    XLogRecPtr message_lsn,
+    const char *prefix,
+    Size message_size,
+    const char *message
+);
+</programlisting>
+      The <parameter>ctx</parameter> and <parameter>txn</parameter> parameters
+      have the same contents as for the <function>begin_cb</function> with the
+      difference that <parameter>txn</parameter> is null for non-transactional
+      messages. The <parameter>lsn</parameter> has WAL position of the message.
+      The <parameter>prefix</parameter> is arbitrary textual prefix which can
+      be used for identifying interesting messages for the current plugin.
+      And finally the <parameter>message</parameter> parameter holds the
+      actual message of <parameter>message_size</parameter> size.
+     </para>
+     <para>
+      Extra care should be taken to ensure that the prefix the output plugin
+      considers interesting is unique. Using name of the extension or the
+      output plugin itself is often a good choice.
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..723b4d8 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
-	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   hashdesc.o heapdesc.o logicalmsgdesc.o mxactdesc.o nbtdesc.o \
+	   relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
new file mode 100644
index 0000000..b194e14
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info == XLOG_LOGICAL_MESSAGE)
+	{
+		xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+		appendStringInfo(buf, "%s message size %zu bytes",
+				   xlrec->transactional ? "transactional" : "nontransactional",
+						 xlrec->message_size);
+	}
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+	if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
+		return "MESSAGE";
+
+	return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..1a42121 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -23,6 +23,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 8adea13..1d7ca06 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
 	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 13af485..98ec5d9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -39,6 +39,7 @@
 
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeHeapOp(ctx, &buf);
 			break;
 
+		case RM_LOGICALMSG_ID:
+			DecodeLogicalMsgOp(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -457,6 +463,49 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	xl_logical_message *message;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+	message = (xl_logical_message *) XLogRecGetData(r);
+
+	if (message->transactional)
+	{
+		if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
+			return;
+
+		ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
+								  buf->endptr,
+								  message->message, /* first part of message is prefix */
+								  message->message_size,
+								  message->message + message->prefix_size);
+	}
+	else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
+			 !SnapBuildXactNeedsSkip(builder, buf->origptr))
+	{
+		volatile Snapshot	snapshot_now;
+		ReorderBuffer	   *rb = ctx->reorder;
+
+		/* setup snapshot to allow catalog access */
+		snapshot_now = SnapBuildGetOrBuildSnapshot(builder, XLogRecGetXid(r));
+		SetupHistoricSnapshot(snapshot_now, NULL);
+		rb->message(rb, NULL, buf->origptr, message->message,
+					message->message_size,
+					message->message + message->prefix_size);
+		TeardownHistoricSnapshot(false);
+	}
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..7863d52 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr message_lsn, const char *prefix, Size sz,
+				  const char *message);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->message = message_cb_wrapper;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -702,6 +706,39 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static void
+message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr message_lsn, const char *prefix, Size sz,
+				   const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	if (ctx->callbacks.message_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "message";
+	state.report_location = message_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.message_cb(ctx, txn, message_lsn, prefix, sz, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..552dac3 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -24,6 +24,8 @@
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
@@ -41,6 +43,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/message.h"
 
 #include "storage/fd.h"
 
@@ -363,3 +366,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
 	return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+
+/*
+ * SQL function for writing logical decding message into WAL.
+ */
+Datum
+pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
+{
+	bool		transactional = PG_GETARG_BOOL(0);
+	char	   *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+	bytea	   *data = PG_GETARG_BYTEA_PP(2);
+	XLogRecPtr	lsn;
+
+	lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+							transactional);
+	PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_message_text(PG_FUNCTION_ARGS)
+{
+	/* bytea and text are compatible */
+	return pg_logical_emit_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
new file mode 100644
index 0000000..214be45
--- /dev/null
+++ b/src/backend/replication/logical/message.c
@@ -0,0 +1,87 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ *	  Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The prefix has to be registered before the message using that
+ * prefix can be written to XLOG. The prefix can be registered exactly once to
+ * avoid situation where multiple third party extensions try to use same
+ * prefix.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+				  bool transactional)
+{
+	xl_logical_message	xlrec;
+
+	/*
+	 * Force xid to be allocated if we're emitting a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.transactional = transactional;
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+	XLogRegisterData((char *) prefix, xlrec.prefix_size);
+	XLogRegisterData((char *) message, size);
+
+	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+			elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f2b8f4b..d6d436e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 				change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			if (change->data.msg.prefix != NULL)
+				pfree(change->data.msg.prefix);
+			change->data.msg.prefix = NULL;
+			if (change->data.msg.message != NULL)
+				pfree(change->data.msg.message);
+			change->data.msg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			if (change->data.snapshot)
 			{
@@ -627,6 +635,28 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+						  const char *prefix, Size msg_sz, const char *msg)
+{
+	ReorderBufferChange *change;
+
+	Assert(xid != InvalidTransactionId);
+
+	change = ReorderBufferGetChange(rb);
+	change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+	change->data.msg.prefix = pstrdup(prefix);
+	change->data.msg.message_size = msg_sz;
+	change->data.msg.message = palloc(msg_sz);
+	memcpy(change->data.msg.message, msg, msg_sz);
+
+	ReorderBufferQueueChange(rb, xid, lsn, change);
+}
+
+
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
@@ -1493,6 +1523,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					specinsert = change;
 					break;
 
+				case REORDER_BUFFER_CHANGE_MESSAGE:
+					rb->message(rb, txn, change->lsn,
+								change->data.msg.prefix,
+								change->data.msg.message_size,
+								change->data.msg.message);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
@@ -2159,6 +2196,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				}
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				char	   *data;
+				size_t		prefix_size = strlen(change->data.msg.prefix) + 1;
+
+				sz += prefix_size + change->data.msg.message_size;
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				memcpy(data, change->data.msg.prefix,
+					   prefix_size);
+				memcpy(data + prefix_size, change->data.msg.message,
+					   change->data.msg.message_size);
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	snap;
@@ -2415,6 +2467,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			}
 
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				Size		message_size = change->data.msg.message_size;
+				/* prefix includes trailing zero */
+				Size		prefix_size = strlen(data) + 1;
+
+				change->data.msg.prefix = MemoryContextAlloc(rb->context,
+															 prefix_size);
+				memcpy(change->data.msg.prefix, data, prefix_size);
+				data += prefix_size;
+				change->data.msg.message = MemoryContextAlloc(rb->context,
+															  message_size);
+				memcpy(change->data.msg.message, data, message_size);
+				data += message_size;
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	oldsnap;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 179b85a..b4dc617 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -605,6 +605,25 @@ SnapBuildExportSnapshot(SnapBuild *builder)
 }
 
 /*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+	Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+	/* only build a new snapshot if we don't have a prebuilt one */
+	if (builder->snapshot == NULL)
+	{
+		builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+		/* inrease refcount for the snapshot builder */
+		SnapBuildSnapIncRefcount(builder->snapshot);
+	}
+
+	return builder->snapshot;
+}
+
+/*
  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
  * any. Aborts the previously started transaction and resets the resource
  * owner back to its original value.
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..6ba7f22 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -25,6 +25,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..35c242d 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a595327..3713739 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5114,6 +5114,10 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
+DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 (  pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
+DESCR("emit a binary logical decoding message");
 
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..5540414 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
 
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
 #endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 0000000..b1730c9
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ *	   Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+	bool		transactional;					/* is message transactional? */
+	size_t		prefix_size;					/* length of prefix */
+	size_t		message_size;					/* size of the message */
+	char		message[FLEXIBLE_ARRAY_MEMBER];	/* message including the null
+												 * terminated prefx of length
+												 * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+									size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE	0x00
+void		logicalmsg_redo(XLogReaderState *record);
+void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif   /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 577b12e..fb5b758 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,17 @@ typedef void (*LogicalDecodeCommitCB) (
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+											 struct LogicalDecodingContext *,
+											 ReorderBufferTXN *txn,
+											 XLogRecPtr message_lsn,
+											 const char *prefix,
+											 Size message_size,
+											 const char *message);
+
+/*
  * Filter changes by origin.
  */
 typedef bool (*LogicalDecodeFilterByOriginCB) (
@@ -96,6 +107,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b52d06a..c12dbf4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -54,6 +54,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -98,6 +99,14 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		/* Message with arbitrary data. */
+		struct
+		{
+			char *prefix;
+			size_t message_size;
+			char *message;
+		} msg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -274,6 +283,14 @@ typedef void (*ReorderBufferCommitCB) (
 												   ReorderBufferTXN *txn,
 												   XLogRecPtr commit_lsn);
 
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr message_lsn,
+												   const char *prefix, Size sz,
+												   const char *message);
+
 struct ReorderBuffer
 {
 	/*
@@ -300,6 +317,7 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferMessageCB message;
 
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
@@ -350,6 +368,8 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+									  const char *prefix, Size msg_sz, const char *msg);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 75955af..c4127a1 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
 extern void SnapBuildClearExportedSnapshot(void);
 
 extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 
-- 
1.9.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to