Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-13 Thread Andres Freund
On 2016-04-07 19:53:56 +0200, Petr Jelinek wrote:
> On 07/04/16 12:26, Andres Freund wrote:
> >Hi,
> >
> >On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
> >>Attached patch adds filtering of both database and origin. Added tests with
> >>slightly less hardcoding than what you proposed above.
> >
> >Not a fan of creating & dropping another database - that's really rather
> >expensive. And we're in a target that doesn't support installcheck, so
> >relying on template1's existance seems fine...
> >
> 
> Makes sense, changed that bit.

I've pushed this.  I also noticed that both this patch (that's ok,
committers commonly do that, but a reminder is nice) and the original
commit ommitted to bump XLOG_PAGE_MAGIC.  The original commit also
omitted bumping catversion. btw.

Thanks for the patch.

Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-07 Thread Petr Jelinek

On 07/04/16 12:26, Andres Freund wrote:

Hi,

On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:

Attached patch adds filtering of both database and origin. Added tests with
slightly less hardcoding than what you proposed above.


Not a fan of creating & dropping another database - that's really rather
expensive. And we're in a target that doesn't support installcheck, so
relying on template1's existance seems fine...



Makes sense, changed that bit.




diff --git a/contrib/test_decoding/expected/messages.out 
b/contrib/test_decoding/expected/messages.out
index 70130e9..a5b13c5 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -1,6 +1,5 @@
  -- predictability
  SET synchronous_commit = on;
-SET client_encoding = 'utf8';


I guess that's just from the previous test with czech text?



Yeah it's cleanup after the d25379eb23383f1d2f969e65e0332b47c19aea94 .

--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index 70130e9..c75d401 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
  ?column? 
 --
@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
 (7 rows)
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
  ?column? 
 --
- init
+ otherdb1
+(1 row)
+
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+ ?column? 
+--
+ otherdb2
+(1 row)
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data 
+--
+(0 rows)
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+--
+ cleanup
 (1 row)
 
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..8e8889d 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 ERROR:  cannot setup replication origin when one is already setup
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+ ?column? 
+--
+ this message will not be decoded
+(1 row)
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index 4aedb04..cf3f773 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..a33e460bb 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-07 Thread Andres Freund
Hi,

On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
> Attached patch adds filtering of both database and origin. Added tests with
> slightly less hardcoding than what you proposed above.

Not a fan of creating & dropping another database - that's really rather
expensive. And we're in a target that doesn't support installcheck, so
relying on template1's existance seems fine...


> diff --git a/contrib/test_decoding/expected/messages.out 
> b/contrib/test_decoding/expected/messages.out
> index 70130e9..a5b13c5 100644
> --- a/contrib/test_decoding/expected/messages.out
> +++ b/contrib/test_decoding/expected/messages.out
> @@ -1,6 +1,5 @@
>  -- predictability
>  SET synchronous_commit = on;
> -SET client_encoding = 'utf8';

I guess that's just from the previous test with czech text?

Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Andres Freund


On April 7, 2016 2:26:41 AM GMT+02:00, Michael Paquier 
 wrote:
>On Thu, Apr 7, 2016 at 12:55 AM, Andres Freund 
>wrote:
>> On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
>>> Perhaps easy to solve, but how do we test it is solved?
>>
>> Maybe something like
>>
>> -- drain
>> pg_logical_slot_get_changes(...);
>> -- generate message in different database, to ensure it's not
>processed
>> -- in this database
>> \c template1
>> SELECT pg_logical_emit_message(...);
>> \c postgres
>> -- check
>> pg_logical_slot_get_changes(..);
>>
>> It's a bit ugly to hardcode database names :/
>
>When running installcheck, there is no way to be sure that databases
>template1 and/or postgres exist on a server, so this test would fail
>because of that.

No need to hardcode postgres, see Petr's reply. I'm not concerned about 
template 1 not being there -if you tinkered with things in that level it's 
unlikely that tests will succeed.  Also, remember, this is in a test cluster 
created by the regression script, and there's no installcheck support anyway 
(because of the required settings for logical decoding) anyway
-- 
Sent from my Android device with K-9 Mail. Please excuse my brevity.


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Michael Paquier
On Thu, Apr 7, 2016 at 12:55 AM, Andres Freund  wrote:
> On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
>> Perhaps easy to solve, but how do we test it is solved?
>
> Maybe something like
>
> -- drain
> pg_logical_slot_get_changes(...);
> -- generate message in different database, to ensure it's not processed
> -- in this database
> \c template1
> SELECT pg_logical_emit_message(...);
> \c postgres
> -- check
> pg_logical_slot_get_changes(..);
>
> It's a bit ugly to hardcode database names :/

When running installcheck, there is no way to be sure that databases
template1 and/or postgres exist on a server, so this test would fail
because of that.
-- 
Michael


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Petr Jelinek

On 06/04/16 17:55, Andres Freund wrote:

On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:

Perhaps easy to solve, but how do we test it is solved?


Maybe something like

-- drain
pg_logical_slot_get_changes(...);
-- generate message in different database, to ensure it's not processed
-- in this database
\c template1
SELECT pg_logical_emit_message(...);
\c postgres
-- check
pg_logical_slot_get_changes(..);

It's a bit ugly to hardcode database names :/



Attached patch adds filtering of both database and origin. Added tests 
with slightly less hardcoding than what you proposed above.


--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index 70130e9..a5b13c5 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
  ?column? 
 --
@@ -71,9 +70,32 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
 (7 rows)
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+CREATE DATABASE test_logical_messages;
+\set prevdb :DBNAME
+\c test_logical_messages
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
  ?column? 
 --
- init
+ otherdb1
+(1 row)
+
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+ ?column? 
+--
+ otherdb2
+(1 row)
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data 
+--
+(0 rows)
+
+DROP DATABASE test_logical_messages;
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+--
+ cleanup
 (1 row)
 
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..8e8889d 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 ERROR:  cannot setup replication origin when one is already setup
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+ ?column? 
+--
+ this message will not be decoded
+(1 row)
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index 4aedb04..eba7d7a 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
@@ -22,4 +21,17 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+CREATE DATABASE test_logical_messages;
+\set prevdb :DBNAME
+\c test_logical_messages
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+DROP DATABASE test_logical_messages;
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..a33e460bb 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3e80c4a..0cdb0b8 100644
--- 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Andres Freund
On 2016-04-06 16:49:17 +0100, Simon Riggs wrote:
> Perhaps easy to solve, but how do we test it is solved?

Maybe something like

-- drain
pg_logical_slot_get_changes(...);
-- generate message in different database, to ensure it's not processed
-- in this database
\c template1
SELECT pg_logical_emit_message(...);
\c postgres
-- check
pg_logical_slot_get_changes(..);

It's a bit ugly to hardcode database names :/

Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Simon Riggs
On 6 April 2016 at 15:29, Andres Freund  wrote:

> On 2016-04-06 10:24:52 -0400, Tom Lane wrote:
> > Andres Freund  writes:
> > > On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> > >> Well, that's something worth thinking about.  I assume that
> > >> pg_logical_slot_get_changes could be executed in a database different
> from
> > >> the one where a change was originated?
> >
> > > You can execute it, but you'll get an error:
> >
> > Oh good.  I was afraid we had an unrecognized can o' worms here.
>
> As posted nearby, there's a hole in that defense; for the messages
> only. Pretty easy to solve though.
>

My instinct was to put in a test for non-ascii text; even if we can't keep
that test, it has highlighted a hole we wouldn't have spotted for a while,
so I'll call that "good catch" then.

Perhaps easy to solve, but how do we test it is solved?

-- 
Simon Riggshttp://www.2ndQuadrant.com/

PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Andres Freund
On 2016-04-06 10:24:52 -0400, Tom Lane wrote:
> Andres Freund  writes:
> > On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> >> Well, that's something worth thinking about.  I assume that
> >> pg_logical_slot_get_changes could be executed in a database different from
> >> the one where a change was originated?
> 
> > You can execute it, but you'll get an error:
> 
> Oh good.  I was afraid we had an unrecognized can o' worms here.

As posted nearby, there's a hole in that defense; for the messages
only. Pretty easy to solve though.

Allowing logical decoding from a difference would have a lot of
problems; primarily we couldn't actually look up any catalog state. But
even leaving that aside, it'd end up being pretty hard to interpret
database contents without knowledge about encoding.

Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Andres Freund
On 2016-04-06 16:20:29 +0200, Andres Freund wrote:
> On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> > > In some ways it seems like the argument to pg_logical_emit_message(...) 
> > > should
> > > be 'bytea'. That'd be much more convenient for application use. But then
> > > it's a pain when using it via the text-format SQL interface calls, where
> > > we've got no sensible way to output it.
> 
> There's a bytea version.
> 
> > Well, that's something worth thinking about.  I assume that
> > pg_logical_slot_get_changes could be executed in a database different from
> > the one where a change was originated?
> 
> You can execute it, but you'll get an error:
>   if (slot->data.database != MyDatabaseId)
>   ereport(ERROR,
>   
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> (errmsg("replication slot \"%s\" was not created in this 
> database",
> NameStr(slot->data.name);

Or so I thought. A look at the code shows a lack of database filtering
in DecodeLogicalMsgOp().  I think it also misses a FilterByOrigin()
check.

Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Tom Lane
Andres Freund  writes:
> On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
>> Well, that's something worth thinking about.  I assume that
>> pg_logical_slot_get_changes could be executed in a database different from
>> the one where a change was originated?

> You can execute it, but you'll get an error:

Oh good.  I was afraid we had an unrecognized can o' worms here.

regards, tom lane


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Andres Freund
On 2016-04-06 10:15:59 -0400, Tom Lane wrote:
> > In some ways it seems like the argument to pg_logical_emit_message(...) 
> > should
> > be 'bytea'. That'd be much more convenient for application use. But then
> > it's a pain when using it via the text-format SQL interface calls, where
> > we've got no sensible way to output it.

There's a bytea version.

> Well, that's something worth thinking about.  I assume that
> pg_logical_slot_get_changes could be executed in a database different from
> the one where a change was originated?

You can execute it, but you'll get an error:
if (slot->data.database != MyDatabaseId)
ereport(ERROR,

(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  (errmsg("replication slot \"%s\" was not created in this 
database",
  NameStr(slot->data.name);


Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Tom Lane
Craig Ringer  writes:
> Interesting issue. Mainly because the "Å¥" char it complains about
> (utf-8 0xc5 0xa5) is accepted in the SELECT that generates the record.

Uh, no, actually it's the SELECT that's failing.

> The regress script in question sets:
> SET client_encoding = 'utf8';
> but we're apparently round-tripping the data through the database encoding
> at some point, then converting back to client_encoding for output.

The conversion to DB encoding will happen the instant the query string
reaches the database.  You can set client_encoding to whatever you want,
but the only characters that can appear in queries are those that exist
in both the client encoding and the database encoding.

> In some ways it seems like the argument to pg_logical_emit_message(...) should
> be 'bytea'. That'd be much more convenient for application use. But then
> it's a pain when using it via the text-format SQL interface calls, where
> we've got no sensible way to output it.

Well, that's something worth thinking about.  I assume that
pg_logical_slot_get_changes could be executed in a database different from
the one where a change was originated?  What's going to happen if a string
in WAL contains characters unrepresentable in that database?  Do we even
have logic in there that will attempt to perform the necessary conversion?
And it is *necessary*, not optional, if you are going to claim that the
output of pg_logical_slot_get_changes is of type text.

regards, tom lane


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-06 Thread Craig Ringer
Committed. https://commitfest.postgresql.org/9/468/

Buildfarm error:
http://www.postgresql.org/message-id/cab7npqrod2mxqy_5+czjvhw0whrrz6p8jv_rsblcrxrtwlh...@mail.gmail.com

Interesting issue. Mainly because the "ť" char it complains about
(utf-8 0xc5 0xa5) is accepted in the SELECT that generates the record. If
it's valid input it should be valid output, right? We didn't change the
client_encoding in the mean time.  It makes sense though:

initdb on that animal says:

The database cluster will be initialized with locale "English_United
States.1252".
The default database encoding has accordingly been set to "WIN1252".


The regress script in question sets:

SET client_encoding = 'utf8';

but we're apparently round-tripping the data through the database encoding
at some point, then converting back to client_encoding for output.

Presumably that's when we're forming the text 'data' column in the
tuplestore produced by the get changes function, which will be in the
database encoding.

So setting client_encoding is not sufficient to make this work and the
non-7-bit-ascii part should be removed from the test, since it's not
allowed on all machines.


In some ways it seems like the argument to pg_logical_emit_message(...) should
be 'bytea'. That'd be much more convenient for application use. But then
it's a pain when using it via the text-format SQL interface calls, where
we've got no sensible way to output it.

-- 
 Craig Ringer   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-04 Thread Simon Riggs
On 4 April 2016 at 05:23, Petr Jelinek  wrote:


> I rebased this patch on top of current master as the generic wal commit
> added some conflicting changes. Also fixed couple of typos in comments and
> added non ascii message to test.


This looks good to me, so have marked it Ready For Committer.

I marked myself as Committer to show there was interest in this. If anyone
else would like to commit it, I am happy for you to do so.

-- 
Simon Riggshttp://www.2ndQuadrant.com/

PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-04-03 Thread Petr Jelinek

Hi,

I rebased this patch on top of current master as the generic wal commit 
added some conflicting changes. Also fixed couple of typos in comments 
and added non ascii message to test.


--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From b74e6dc5956446d514130c575263f4ba6ad71db3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |   2 +-
 contrib/test_decoding/expected/ddl.out  |  21 ++--
 contrib/test_decoding/expected/messages.out |  79 
 contrib/test_decoding/sql/ddl.sql   |   3 +-
 contrib/test_decoding/sql/messages.sql  |  25 +
 contrib/test_decoding/test_decoding.c   |  18 
 doc/src/sgml/func.sgml  |  45 +
 doc/src/sgml/logicaldecoding.sgml   |  38 
 src/backend/access/rmgrdesc/Makefile|   6 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c|  41 
 src/backend/access/transam/rmgr.c   |   1 +
 src/backend/replication/logical/Makefile|   2 +-
 src/backend/replication/logical/decode.c|  46 +
 src/backend/replication/logical/logical.c   |  38 
 src/backend/replication/logical/logicalfuncs.c  |  27 ++
 src/backend/replication/logical/message.c   |  87 +
 src/backend/replication/logical/reorderbuffer.c | 121 
 src/backend/replication/logical/snapbuild.c |  19 
 src/bin/pg_xlogdump/.gitignore  |  21 +---
 src/bin/pg_xlogdump/rmgrdesc.c  |   1 +
 src/include/access/rmgrlist.h   |   1 +
 src/include/catalog/pg_proc.h   |   4 +
 src/include/replication/logicalfuncs.h  |   2 +
 src/include/replication/message.h   |  41 
 src/include/replication/output_plugin.h |  13 +++
 src/include/replication/reorderbuffer.h |  22 +
 src/include/replication/snapbuild.h |   2 +
 27 files changed, 693 insertions(+), 33 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c9546..309cb0b 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time
+	decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+?column?
+
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |   min   |  max   
+-+
- 1 | BEGIN   | BEGIN
- 1 | COMMIT  | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]: data[integer]:-
-(3 rows)
+ count |  min  |  max   

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-23 Thread Petr Jelinek

On 23/03/16 14:17, Alvaro Herrera wrote:

Petr Jelinek wrote:


+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,17 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');


No tests for a rolled back transaction?



Good point, probably worth testing especially since we have 
non-transactional messages.


--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From 81fc28cedc19fe0f91f882d42989c14113a40f88 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |   2 +-
 contrib/test_decoding/expected/ddl.out  |  21 ++--
 contrib/test_decoding/expected/messages.out |  71 ++
 contrib/test_decoding/sql/ddl.sql   |   3 +-
 contrib/test_decoding/sql/messages.sql  |  22 +
 contrib/test_decoding/test_decoding.c   |  18 
 doc/src/sgml/func.sgml  |  45 +
 doc/src/sgml/logicaldecoding.sgml   |  38 
 src/backend/access/rmgrdesc/Makefile|   4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c|  41 
 src/backend/access/transam/rmgr.c   |   1 +
 src/backend/replication/logical/Makefile|   2 +-
 src/backend/replication/logical/decode.c|  46 +
 src/backend/replication/logical/logical.c   |  38 
 src/backend/replication/logical/logicalfuncs.c  |  27 ++
 src/backend/replication/logical/message.c   |  85 +
 src/backend/replication/logical/reorderbuffer.c | 121 
 src/backend/replication/logical/snapbuild.c |  19 
 src/bin/pg_xlogdump/.gitignore  |  20 +---
 src/bin/pg_xlogdump/rmgrdesc.c  |   1 +
 src/include/access/rmgrlist.h   |   1 +
 src/include/catalog/pg_proc.h   |   4 +
 src/include/replication/logicalfuncs.h  |   2 +
 src/include/replication/message.h   |  41 
 src/include/replication/output_plugin.h |  13 +++
 src/include/replication/reorderbuffer.h |  22 +
 src/include/replication/snapbuild.h |   2 +
 27 files changed, 679 insertions(+), 31 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c9546..309cb0b 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-   decoding_into_rel binary prepared replorigin time
+   decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out 
b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM 
pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical 
msg');
+?column?
+
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |   min 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-23 Thread Alvaro Herrera
Petr Jelinek wrote:

> +++ b/contrib/test_decoding/sql/messages.sql
> @@ -0,0 +1,17 @@
> +-- predictability
> +SET synchronous_commit = on;
> +
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
> 'test_decoding');
> +
> +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
> +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
> +
> +BEGIN;
> +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
> +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
> +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
> +COMMIT;
> +
> +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 
> 'force-binary', '0', 'skip-empty-xacts', '1');
> +
> +SELECT 'init' FROM pg_drop_replication_slot('regression_slot');

No tests for a rolled back transaction?

-- 
Álvaro Herrerahttp://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-23 Thread Petr Jelinek

On 22/03/16 14:11, Andres Freund wrote:

On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote:

On 22/03/16 12:47, Andres Freund wrote:

On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:


+
+
+ Generic Message Callback
+
+ 
+  The optional message_cb callback is called whenever
+  a logical decoding message has been decoded.
+
+typedef void (*LogicalDecodeMessageCB) (
+struct LogicalDecodingContext *,
+ReorderBufferTXN *txn,
+XLogRecPtr message_lsn,
+const char *prefix,
+Size message_size,
+const char *message
+);


I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?



Hmm but won't that give the output plugin even transactions that were later
aborted? That seems quite different behavior from how the txn parameter
works everywhere else.


Seems harmless to me if called out.



All right, after some consideration I agree.






+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+   SnapBuild  *builder = ctx->snapshot_builder;
+   XLogReaderState *r = buf->record;
+   uint8   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+   xl_logical_message *message;
+
+   if (info != XLOG_LOGICAL_MESSAGE)
+   elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", 
info);
+
+   message = (xl_logical_message *) XLogRecGetData(r);
+
+   if (message->transactional)
+   {
+   if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), 
buf->origptr))
+   return;
+
+   ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
+ buf->endptr,
+ 
message->message, /* first part of message is prefix */
+ 
message->message_size,
+ message->message 
+ message->prefix_size);
+   }
+   else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
+!SnapBuildXactNeedsSkip(builder, buf->origptr))
+   {
+   volatile Snapshot   snapshot_now;
+   ReorderBuffer  *rb = ctx->reorder;
+
+   /* setup snapshot to allow catalog access */
+   snapshot_now = SnapBuildGetOrBuildSnapshot(builder, 
XLogRecGetXid(r));
+   SetupHistoricSnapshot(snapshot_now, NULL);
+   rb->message(rb, NULL, buf->origptr, message->message,
+   message->message_size,
+   message->message + 
message->prefix_size);
+   TeardownHistoricSnapshot(false);
+   }
+}


A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for
transactional messages - we can't yet necessarily build a snapshot.


Nope, the snapshot state is checked in the else if.


2) I'm inclined to move even the non-transactional stuff to reorderbuffer.


Well, it's not doing anything with reorderbuffer but sure it can be done
(didn't do it in the attached though).


I think there'll be some interactions if we ever do some parts in
parallel and such.  I'd rather keep decode.c to only do the lowest level
stuff.



Did it that way but I do like the resulting code less.

--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From 6076cb5d85cab4cfe4cb99499ed49e5a0c384033 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |   2 +-
 contrib/test_decoding/expected/ddl.out  |  21 ++--
 contrib/test_decoding/expected/messages.out |  56 +++
 contrib/test_decoding/sql/ddl.sql   |   3 +-
 contrib/test_decoding/sql/messages.sql  |  17 
 contrib/test_decoding/test_decoding.c   |  18 
 doc/src/sgml/func.sgml  |  45 +
 doc/src/sgml/logicaldecoding.sgml   |  38 
 src/backend/access/rmgrdesc/Makefile|   4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c|  41 
 src/backend/access/transam/rmgr.c   |   1 +
 src/backend/replication/logical/Makefile|   2 +-
 src/backend/replication/logical/decode.c|  46 +
 src/backend/replication/logical/logical.c   |  38 
 src/backend/replication/logical/logicalfuncs.c  |  27 ++
 src/backend/replication/logical/message.c   |  85 +
 src/backend/replication/logical/reorderbuffer.c | 121 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-22 Thread Andres Freund
On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote:
> On 22/03/16 12:47, Andres Freund wrote:
> >On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> >
> >>+
> >>+
> >>+ Generic Message Callback
> >>+
> >>+ 
> >>+  The optional message_cb callback is called 
> >>whenever
> >>+  a logical decoding message has been decoded.
> >>+
> >>+typedef void (*LogicalDecodeMessageCB) (
> >>+struct LogicalDecodingContext *,
> >>+ReorderBufferTXN *txn,
> >>+XLogRecPtr message_lsn,
> >>+const char *prefix,
> >>+Size message_size,
> >>+const char *message
> >>+);
> >
> >I see you removed the transactional parameter. I'm doubtful that that's
> >a good idea: It seems like it'd be rather helpful to pass the
> >transaction for a nontransaction message that's emitted while an xid was
> >assigned?
> >
> 
> Hmm but won't that give the output plugin even transactions that were later
> aborted? That seems quite different behavior from how the txn parameter
> works everywhere else.

Seems harmless to me if called out.


> >
> >>+/*
> >>+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> >>+ */
> >>+static void
> >>+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> >>+{
> >>+   SnapBuild  *builder = ctx->snapshot_builder;
> >>+   XLogReaderState *r = buf->record;
> >>+   uint8   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> >>+   xl_logical_message *message;
> >>+
> >>+   if (info != XLOG_LOGICAL_MESSAGE)
> >>+   elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", 
> >>info);
> >>+
> >>+   message = (xl_logical_message *) XLogRecGetData(r);
> >>+
> >>+   if (message->transactional)
> >>+   {
> >>+   if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), 
> >>buf->origptr))
> >>+   return;
> >>+
> >>+   ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> >>+ buf->endptr,
> >>+ 
> >>message->message, /* first part of message is prefix */
> >>+ 
> >>message->message_size,
> >>+ 
> >>message->message + message->prefix_size);
> >>+   }
> >>+   else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> >>+!SnapBuildXactNeedsSkip(builder, buf->origptr))
> >>+   {
> >>+   volatile Snapshot   snapshot_now;
> >>+   ReorderBuffer  *rb = ctx->reorder;
> >>+
> >>+   /* setup snapshot to allow catalog access */
> >>+   snapshot_now = SnapBuildGetOrBuildSnapshot(builder, 
> >>XLogRecGetXid(r));
> >>+   SetupHistoricSnapshot(snapshot_now, NULL);
> >>+   rb->message(rb, NULL, buf->origptr, message->message,
> >>+   message->message_size,
> >>+   message->message + 
> >>message->prefix_size);
> >>+   TeardownHistoricSnapshot(false);
> >>+   }
> >>+}
> >
> >A number of things:
> >1) The SnapBuildProcessChange needs to be toplevel, not just for
> >transactional messages - we can't yet necessarily build a snapshot.
> 
> Nope, the snapshot state is checked in the else if.
> 
> >2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
> 
> Well, it's not doing anything with reorderbuffer but sure it can be done
> (didn't do it in the attached though).

I think there'll be some interactions if we ever do some parts in
parallel and such.  I'd rather keep decode.c to only do the lowest level
stuff.

Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-22 Thread Petr Jelinek

On 22/03/16 12:47, Andres Freund wrote:

On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:


+
+
+ Generic Message Callback
+
+ 
+  The optional message_cb callback is called whenever
+  a logical decoding message has been decoded.
+
+typedef void (*LogicalDecodeMessageCB) (
+struct LogicalDecodingContext *,
+ReorderBufferTXN *txn,
+XLogRecPtr message_lsn,
+const char *prefix,
+Size message_size,
+const char *message
+);


I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?



Hmm but won't that give the output plugin even transactions that were 
later aborted? That seems quite different behavior from how the txn 
parameter works everywhere else.





+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+   SnapBuild  *builder = ctx->snapshot_builder;
+   XLogReaderState *r = buf->record;
+   uint8   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+   xl_logical_message *message;
+
+   if (info != XLOG_LOGICAL_MESSAGE)
+   elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", 
info);
+
+   message = (xl_logical_message *) XLogRecGetData(r);
+
+   if (message->transactional)
+   {
+   if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), 
buf->origptr))
+   return;
+
+   ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
+ buf->endptr,
+ 
message->message, /* first part of message is prefix */
+ 
message->message_size,
+ message->message 
+ message->prefix_size);
+   }
+   else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
+!SnapBuildXactNeedsSkip(builder, buf->origptr))
+   {
+   volatile Snapshot   snapshot_now;
+   ReorderBuffer  *rb = ctx->reorder;
+
+   /* setup snapshot to allow catalog access */
+   snapshot_now = SnapBuildGetOrBuildSnapshot(builder, 
XLogRecGetXid(r));
+   SetupHistoricSnapshot(snapshot_now, NULL);
+   rb->message(rb, NULL, buf->origptr, message->message,
+   message->message_size,
+   message->message + 
message->prefix_size);
+   TeardownHistoricSnapshot(false);
+   }
+}


A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for
transactional messages - we can't yet necessarily build a snapshot.


Nope, the snapshot state is checked in the else if.


2) I'm inclined to move even the non-transactional stuff to reorderbuffer.


Well, it's not doing anything with reorderbuffer but sure it can be done 
(didn't do it in the attached though).



3) This lacks error handling, we surely don't want to error out while
still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.


Fixed (all 3 above).





@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, 
ReorderBufferChange *change)
change->data.tp.oldtuple = NULL;
}
break;
+   case REORDER_BUFFER_CHANGE_MESSAGE:
+   if (change->data.msg.prefix != NULL)
+   pfree(change->data.msg.prefix);
+   change->data.msg.prefix = NULL;
+   if (change->data.msg.message != NULL)
+   pfree(change->data.msg.message);
+   change->data.msg.message = NULL;
+   break;


Hm, this will have some overhead, but I guess the messages won't be
super frequent, and usually not very large.


Yeah but since we don't really know the size of the future messages it's 
hard to have some preallocated buffer for this so I dunno how else to do it.





+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+ const char *prefix, Size 
msg_sz, const char *msg)
+{
+   ReorderBufferChange *change;
+
+   Assert(xid != InvalidTransactionId);
+
+   change = ReorderBufferGetChange(rb);
+   change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+   change->data.msg.prefix = pstrdup(prefix);
+   change->data.msg.message_size = msg_sz;
+   

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-22 Thread Andres Freund
On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> Just noticed there is missing symlink in the pg_xlogdump.

>  create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
>  create mode 12 src/bin/pg_xlogdump/logicalmsgdesc.c

Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink
is supposed to be automatically created by the Makefile.

Were you perhaps confused because it showed up in git status? If so,
that's probably because it isn't in
src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to
ignore *desc.c?

> +  
> +   
> +
> + pg_logical_emit_message
> +
> +
> pg_logical_emit_message(transactional
>  bool, prefix text, 
> content text)
> +   
> +   
> +void
> +   
> +   
> +Write text logical decoding message. This can be used to pass generic
> +messages to logical decoding plugins through WAL. The parameter
> +transactional specifies if the message should
> +be part of current transaction or if it should be written immediately
> +and decoded as soon as the logical decoding reads the record. The
> +prefix is textual prefix used by the logical
> +decoding plugins to easily recognize interesting messages for them.
> +The content is the text of the message.
> +   
> +  

s/write/emit/?

> +
> +
> + Generic Message Callback
> +
> + 
> +  The optional message_cb callback is called 
> whenever
> +  a logical decoding message has been decoded.
> +
> +typedef void (*LogicalDecodeMessageCB) (
> +struct LogicalDecodingContext *,
> +ReorderBufferTXN *txn,
> +XLogRecPtr message_lsn,
> +const char *prefix,
> +Size message_size,
> +const char *message
> +);

I see you removed the transactional parameter. I'm doubtful that that's
a good idea: It seems like it'd be rather helpful to pass the
transaction for a nontransaction message that's emitted while an xid was
assigned?


> +/*
> + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> + */
> +static void
> +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> + SnapBuild  *builder = ctx->snapshot_builder;
> + XLogReaderState *r = buf->record;
> + uint8   info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> + xl_logical_message *message;
> +
> + if (info != XLOG_LOGICAL_MESSAGE)
> + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", 
> info);
> +
> + message = (xl_logical_message *) XLogRecGetData(r);
> +
> + if (message->transactional)
> + {
> + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), 
> buf->origptr))
> + return;
> +
> + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> +   buf->endptr,
> +   
> message->message, /* first part of message is prefix */
> +   
> message->message_size,
> +   
> message->message + message->prefix_size);
> + }
> + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> +  !SnapBuildXactNeedsSkip(builder, buf->origptr))
> + {
> + volatile Snapshot   snapshot_now;
> + ReorderBuffer  *rb = ctx->reorder;
> +
> + /* setup snapshot to allow catalog access */
> + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, 
> XLogRecGetXid(r));
> + SetupHistoricSnapshot(snapshot_now, NULL);
> + rb->message(rb, NULL, buf->origptr, message->message,
> + message->message_size,
> + message->message + 
> message->prefix_size);
> + TeardownHistoricSnapshot(false);
> + }
> +}

A number of things:
1) The SnapBuildProcessChange needs to be toplevel, not just for
   transactional messages - we can't yet necessarily build a snapshot.
2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
3) This lacks error handling, we surely don't want to error out while
   still having the historic snapshot setup
4) Without 3) the volatile is bogus.
5) Misses a ReorderBufferProcessXid() call.

> + * Every message carries prefix to avoid conflicts between different decoding
> + * plugins. The prefix has to be registered before the message using that
> + * prefix can be written to XLOG. The prefix can be registered exactly once 
> to
> + * avoid situation where multiple third party extensions try to use same
> + * prefix.

Outdated afaics?


> @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, 
> ReorderBufferChange *change)
>   change->data.tp.oldtuple = NULL;
>   }

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-21 Thread Petr Jelinek

Just noticed there is missing symlink in the pg_xlogdump.

--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From f47730e5e8ef5797c7595aafcbf8cff3b375d0ad Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |  2 +-
 contrib/test_decoding/expected/ddl.out  | 21 --
 contrib/test_decoding/expected/messages.out | 56 
 contrib/test_decoding/sql/ddl.sql   |  3 +-
 contrib/test_decoding/sql/messages.sql  | 17 +
 contrib/test_decoding/test_decoding.c   | 17 +
 doc/src/sgml/func.sgml  | 45 +
 doc/src/sgml/logicaldecoding.sgml   | 34 ++
 src/backend/access/rmgrdesc/Makefile|  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c| 41 
 src/backend/access/transam/rmgr.c   |  1 +
 src/backend/replication/logical/Makefile|  2 +-
 src/backend/replication/logical/decode.c| 49 ++
 src/backend/replication/logical/logical.c   | 37 +++
 src/backend/replication/logical/logicalfuncs.c  | 27 
 src/backend/replication/logical/message.c   | 87 +
 src/backend/replication/logical/reorderbuffer.c | 68 +++
 src/backend/replication/logical/snapbuild.c | 19 ++
 src/bin/pg_xlogdump/logicalmsgdesc.c|  1 +
 src/bin/pg_xlogdump/rmgrdesc.c  |  1 +
 src/include/access/rmgrlist.h   |  1 +
 src/include/catalog/pg_proc.h   |  4 ++
 src/include/replication/logicalfuncs.h  |  2 +
 src/include/replication/message.h   | 41 
 src/include/replication/output_plugin.h | 12 
 src/include/replication/reorderbuffer.h | 20 ++
 src/include/replication/snapbuild.h |  2 +
 27 files changed, 602 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 12 src/bin/pg_xlogdump/logicalmsgdesc.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c9546..309cb0b 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time
+	decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+?column?
+
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |   min   |  max   
+-+
- 1 | BEGIN   | BEGIN
- 1 | COMMIT  | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]: data[integer]:-
-(3 rows)
+ count |  min  |  max   
+---+---+
+ 1 | BEGIN  

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-21 Thread Craig Ringer
On 18 March 2016 at 20:36, Artur Zakirov  wrote:

> On 17.03.2016 15:42, Craig Ringer wrote:
>
>>
>>
>> Would you mind sharing the plugin here? I could add it to
>> src/test/modules and add some t/ tests so it runs under the TAP test
>> framework.
>>
>>
>> --
>>   Craig Ringer http://www.2ndQuadrant.com/
>>   PostgreSQL Development, 24x7 Support, Training & Services
>>
>
> Of course. I attached it.
>

Thanks for that.

Since it incorporates fairly significant chunks of code from a number of
places and is really a proof of concept rather than demo/example I don't
think it can really be included as a TAP test module, not without more
rewriting than I currently have time for anyway.



> But it shows concept of DDL replication.
>

Yeah, a part of it anyway. As I outlined in another thread some time ago,
getting DDL replication right is fairly tricky and requires more changes
than just capturing the raw DDL (or even deparsed DDL) and sending it over
the wire.

It's a useful proof of concept, but way too big to use as an in-tree
regression for logical WAL messages as I was hoping.

-- 
 Craig Ringer   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-21 Thread Petr Jelinek

Hi,

thanks for review.

On 17/03/16 13:36, Tomas Vondra wrote:

Hi,

a few comments about the last version of the patch:


1) LogicalDecodeMessageCB

Do we actually need the 'transactional' parameter here? I mean, having
the 'txn' should be enough, as

 transactional = (txt != NULL)



Agreed. Same goes for the ReoderBufferChange struct btw, only 
transactional messages go there so no point in marking them as such.





2) pg_logical_emit_message_bytea / pg_logical_emit_message_text

The comment before _bytea is wrong - it's just a copy'n'paste from the
preceding function (pg_logical_slot_peek_binary_changes). _text has no
comment at all, but it's true it's  just a simple _bytea wrapper.



Heh, blind.


The main issue here however is that the functions are not defined as
strict, but ignore the possibility that the parameters might be NULL. So
for example this crashes the backend

SELECT pg_logical_emit_message(NULL::boolean, NULL::text, NULL::text);



Good point.



3) ReorderBufferQueueMessage

No comment. Not a big deal I guess, the method is simple enough, but why
to break the rule when all the other methods around have at least a
short one?



Yeah I sometimes am not sure if there is a point to put comment to tiny 
straightforward functions that do more or less same as the one above. 
But it's public API so probably better to have one.




4) ReorderBufferChange

The new struct in the 'union' would probably deserve at least a short
comment explaining the purpose (just like the other structs around).




Okay.

Updated version attached.

(BTW please try to CC author of the patch when reviewing)


--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From 4515b5b3e8f5ddb96124e859b968c43c27f79be3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |  2 +-
 contrib/test_decoding/expected/ddl.out  | 21 --
 contrib/test_decoding/expected/messages.out | 56 
 contrib/test_decoding/sql/ddl.sql   |  3 +-
 contrib/test_decoding/sql/messages.sql  | 17 +
 contrib/test_decoding/test_decoding.c   | 17 +
 doc/src/sgml/func.sgml  | 45 +
 doc/src/sgml/logicaldecoding.sgml   | 34 ++
 src/backend/access/rmgrdesc/Makefile|  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c| 41 
 src/backend/access/transam/rmgr.c   |  1 +
 src/backend/replication/logical/Makefile|  2 +-
 src/backend/replication/logical/decode.c| 49 ++
 src/backend/replication/logical/logical.c   | 37 +++
 src/backend/replication/logical/logicalfuncs.c  | 27 
 src/backend/replication/logical/message.c   | 87 +
 src/backend/replication/logical/reorderbuffer.c | 68 +++
 src/backend/replication/logical/snapbuild.c | 19 ++
 src/bin/pg_xlogdump/rmgrdesc.c  |  1 +
 src/include/access/rmgrlist.h   |  1 +
 src/include/catalog/pg_proc.h   |  4 ++
 src/include/replication/logicalfuncs.h  |  2 +
 src/include/replication/message.h   | 41 
 src/include/replication/output_plugin.h | 12 
 src/include/replication/reorderbuffer.h | 20 ++
 src/include/replication/snapbuild.h |  2 +
 26 files changed, 601 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c9546..309cb0b 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time
+	decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-19 Thread Tomas Vondra

Hi,

a few comments about the last version of the patch:


1) LogicalDecodeMessageCB

Do we actually need the 'transactional' parameter here? I mean, having 
the 'txn' should be enough, as


transactional = (txt != NULL)

Of course, having a simple flag is more convenient.


2) pg_logical_emit_message_bytea / pg_logical_emit_message_text

The comment before _bytea is wrong - it's just a copy'n'paste from the 
preceding function (pg_logical_slot_peek_binary_changes). _text has no 
comment at all, but it's true it's  just a simple _bytea wrapper.


The main issue here however is that the functions are not defined as 
strict, but ignore the possibility that the parameters might be NULL. So 
for example this crashes the backend


SELECT pg_logical_emit_message(NULL::boolean, NULL::text, NULL::text);


3) ReorderBufferQueueMessage

No comment. Not a big deal I guess, the method is simple enough, but why 
to break the rule when all the other methods around have at least a 
short one?



4) ReorderBufferChange

The new struct in the 'union' would probably deserve at least a short 
comment explaining the purpose (just like the other structs around).



regards

--
Tomas Vondra  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-18 Thread Craig Ringer
On 17 March 2016 at 19:08, Artur Zakirov  wrote:

> On 16.03.2016 18:56, David Steele wrote:
>
>>
>> This patch applies cleanly and is ready for review with no outstanding
>> issues that I can see.  Simon and Artur, you are both signed up as
>> reviewers.  Care to take a crack at it?
>>
>> Thanks,
>>
>>
> I have tested the patch once again and have looked the code. It looks good
> for me. I haven't any observation.
>
> The patch does its function correctly. I have tested it with a plugin,
> which writes DDL queries to the WAL using a hook and replicates this
> queries at subscribers.
>

Would you mind sharing the plugin here? I could add it to src/test/modules
and add some t/ tests so it runs under the TAP test framework.


-- 
 Craig Ringer   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-18 Thread Artur Zakirov

On 16.03.2016 18:56, David Steele wrote:


This patch applies cleanly and is ready for review with no outstanding
issues that I can see.  Simon and Artur, you are both signed up as
reviewers.  Care to take a crack at it?

Thanks,



I have tested the patch once again and have looked the code. It looks 
good for me. I haven't any observation.


The patch does its function correctly. I have tested it with a plugin, 
which writes DDL queries to the WAL using a hook and replicates this 
queries at subscribers.


If Simon is not against, the patch can be marked as "Ready for Commiter".

--
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-18 Thread David Steele
On 3/9/16 6:58 PM, Petr Jelinek wrote:
> On 08/03/16 21:21, Artur Zakirov wrote:
>> I think here
>>
>>> +const char *
>>> +logicalmsg_identify(uint8 info)
>>> +{
>>> +if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
>>> +return "MESSAGE";
>>> +
>>> +return NULL;
>>> +}
>>
>> we should use brackets
>>
>> const char *
>> logicalmsg_identify(uint8 info)
>> {
>>  if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
>>  return "MESSAGE";
>>
>>  return NULL;
>> }
>>
> 
> Correct, fixed, thanks.
> 
> I also rebased this as there was conflict after the fixes to logical
> decoding by Andres.

This patch applies cleanly and is ready for review with no outstanding
issues that I can see.  Simon and Artur, you are both signed up as
reviewers.  Care to take a crack at it?

Thanks,
-- 
-David
da...@pgmasters.net


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-09 Thread Petr Jelinek

On 08/03/16 21:21, Artur Zakirov wrote:

I think here


+const char *
+logicalmsg_identify(uint8 info)
+{
+if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
+return "MESSAGE";
+
+return NULL;
+}


we should use brackets

const char *
logicalmsg_identify(uint8 info)
{
 if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
 return "MESSAGE";

 return NULL;
}



Correct, fixed, thanks.

I also rebased this as there was conflict after the fixes to logical 
decoding by Andres.



--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
>From f65b4d858067af520e853cafb4fdfd11b6b3fdc0 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |  2 +-
 contrib/test_decoding/expected/ddl.out  | 21 --
 contrib/test_decoding/expected/messages.out | 56 
 contrib/test_decoding/sql/ddl.sql   |  3 +-
 contrib/test_decoding/sql/messages.sql  | 17 +
 contrib/test_decoding/test_decoding.c   | 19 ++
 doc/src/sgml/func.sgml  | 45 +
 doc/src/sgml/logicaldecoding.sgml   | 37 +++
 src/backend/access/rmgrdesc/Makefile|  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c| 41 
 src/backend/access/transam/rmgr.c   |  1 +
 src/backend/replication/logical/Makefile|  2 +-
 src/backend/replication/logical/decode.c| 49 ++
 src/backend/replication/logical/logical.c   | 38 +++
 src/backend/replication/logical/logicalfuncs.c  | 27 
 src/backend/replication/logical/message.c   | 87 +
 src/backend/replication/logical/reorderbuffer.c | 67 +++
 src/backend/replication/logical/snapbuild.c | 19 ++
 src/bin/pg_xlogdump/rmgrdesc.c  |  1 +
 src/include/access/rmgrlist.h   |  1 +
 src/include/catalog/pg_proc.h   |  4 ++
 src/include/replication/logicalfuncs.h  |  2 +
 src/include/replication/message.h   | 41 
 src/include/replication/output_plugin.h | 13 
 src/include/replication/reorderbuffer.h | 21 ++
 src/include/replication/snapbuild.h |  2 +
 26 files changed, 608 insertions(+), 12 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 200c43e..7568531 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time
+	decoding_into_rel binary prepared replorigin time messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8..32cd24d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (7 rows)
 
 /*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
  */
 BEGIN;
 CREATE TABLE tr_etoomuch (id serial primary key, data int);
 INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+?column?
+
+ tx logical msg
+(1 row)
+
 DELETE FROM tr_etoomuch WHERE id < 5000;
 UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
 COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
 FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
 GROUP BY substring(data, 1, 24)
 ORDER BY 1,2;
- count |   min   |  max   
+-+
- 1 | BEGIN   | BEGIN
- 1 | COMMIT  | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]: data[integer]:-
-(3 rows)
+ count |  

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-03-08 Thread Artur Zakirov

I think here


+const char *
+logicalmsg_identify(uint8 info)
+{
+   if (info & ~XLR_INFO_MASK == XLOG_LOGICAL_MESSAGE)
+   return "MESSAGE";
+
+   return NULL;
+}


we should use brackets

const char *
logicalmsg_identify(uint8 info)
{
if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
return "MESSAGE";

return NULL;
}

Because of operator priorities 
http://en.cppreference.com/w/c/language/operator_precedence we may get 
errors.


On 01.03.2016 00:10, Petr Jelinek wrote:

Hi,

attached is the newest version of the patch.

I removed the registry, renamed the 'send' to 'emit', documented the
callback parameters properly. I also added the test to ddl.sql for the
serialization and deserialization (and of course found a bug there) and
in general fixed all the stuff Andres reported.

(see more inline)


--
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-29 Thread Petr Jelinek

Hi,

attached is the newest version of the patch.

I removed the registry, renamed the 'send' to 'emit', documented the 
callback parameters properly. I also added the test to ddl.sql for the 
serialization and deserialization (and of course found a bug there) and 
in general fixed all the stuff Andres reported.


(see more inline)

On 28/02/16 22:55, Andres Freund wrote:






+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const 
char *prefix, Size msg_sz,
+ const char *msg)
+{
+   ReorderBufferTXN *txn = NULL;
+
+   if (transactional)
+   {
+   ReorderBufferChange *change;
+
+   txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+   Assert(xid != InvalidTransactionId);
+   Assert(txn != NULL);
+
+   change = ReorderBufferGetChange(rb);
+   change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+   change->data.msg.transactional = true;
+   change->data.msg.prefix = pstrdup(prefix);
+   change->data.msg.message_size = msg_sz;
+   change->data.msg.message = palloc(msg_sz);
+   memcpy(change->data.msg.message, msg, msg_sz);
+
+   ReorderBufferQueueChange(rb, xid, lsn, change);
+   }
+   else
+   {
+   rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
+   }
+}



This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.



Hmm I do see usefulness in having snapshot, although I wonder if that does
not kill the point of non-transactional messages.


I don't see how it would? It'd obviously have to be the catalog/historic
snapshot a transaction would have had if it started in that moment in
the original WAL stream?



Question is then though which snapshot should the message see,
base_snapshot of transaction?


Well, there'll not be a transaction, but something like snapbuild.c's
->snapshot ought to do the trick.



Ok I added interface which returns either existing snapshot or makes new 
one, seems like the most reasonable thing to do to me.





That would mean we'd have to call SnapBuildProcessChange for
non-transactional messages which we currently avoid. Alternatively we
could probably invent lighter version of that interface that would
just make sure builder->snapshot is valid and if not then build it


I think the latter is probably the direction we should go in.



I am honestly sure if that's a win or not.


I think it'll be confusing (bug inducing) if there's no snapshot for
non-transactional messages but for transactional ones, and it'll
severely limit the usefulness of the interface.



Nono, I meant I am not sure if special interface is a win over just 
using SnapBuildProcessChange() in practice.



--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 2f037a757d9cec09f04457d82cdd1256b8255b78 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |  2 +-
 contrib/test_decoding/expected/ddl.out  | 21 --
 contrib/test_decoding/expected/messages.out | 56 
 contrib/test_decoding/sql/ddl.sql   |  3 +-
 contrib/test_decoding/sql/messages.sql  | 17 +
 contrib/test_decoding/test_decoding.c   | 19 ++
 doc/src/sgml/func.sgml  | 45 +
 doc/src/sgml/logicaldecoding.sgml   | 37 +++
 src/backend/access/rmgrdesc/Makefile|  4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c| 41 
 src/backend/access/transam/rmgr.c   |  1 +
 src/backend/replication/logical/Makefile|  2 +-
 src/backend/replication/logical/decode.c| 49 ++
 src/backend/replication/logical/logical.c   | 38 +++
 src/backend/replication/logical/logicalfuncs.c  | 27 
 src/backend/replication/logical/message.c   | 87 +
 src/backend/replication/logical/reorderbuffer.c | 67 +++
 src/backend/replication/logical/snapbuild.c | 19 ++
 src/bin/pg_xlogdump/rmgrdesc.c  |  1 +
 src/include/access/rmgrlist.h   |  1 +
 src/include/catalog/pg_proc.h   |  4 ++
 src/include/replication/logicalfuncs.h  |  2 +
 src/include/replication/message.h   | 41 
 src/include/replication/output_plugin.h | 13 
 src/include/replication/reorderbuffer.h | 21 ++
 src/include/replication/snapbuild.h |  2 +
 26 files changed, 608 insertions(+), 12 deletions(-)
 create mode 100644 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-28 Thread Andres Freund
Hi,

On 2016-02-28 22:44:12 +0100, Petr Jelinek wrote:
> On 27/02/16 01:05, Andres Freund wrote:
> >I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
> >not much documentation about what it actually is supposed to
> >acomplish. Afaics you're basically forced to use
> >shared_preload_libraries with it right now?  Also, iterating through a
> >linked list everytime something is logged doesn't seem very satisfying?
> >
> 
> Well, my reasoning there was to stop multiple plugins from using same prefix
> and for that you need some kind of registry. Making this a shared catalog
> seemed like huge overkill given the potentially transient nature of output
> plugins and this was the best I could come up with. And yes it requires you
> to load your plugin before you can log a message for it.

I think right now that's a solution that's worse than the problem.  I'm
inclined to define the problem away with something like "The prefix
should be unique across different users of the messaging facility. Using
the extension name often is a good choice.".


> >>+void
> >>+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> >>+{
> >>+   char   *rec = XLogRecGetData(record);
> >>+   xl_logical_message *xlrec = (xl_logical_message *) rec;
> >>+
> >>+   appendStringInfo(buf, "%s message size %zu bytes",
> >>+xlrec->transactional ? "transactional" 
> >>: "nontransactional",
> >>+xlrec->message_size);
> >>+}
> >
> >Shouldn't we check
> >   uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
> >   if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
> >here?
> >
> 
> I thought it's kinda pointless, but we seem to be doing it in other places
> so will add.

It leads to a segfault or something similar when adding further message
types, without a compiler warning. So it seems to be a good idea to be
slightly careful.


> >
> >>+void
> >>+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr 
> >>lsn,
> >>+ bool transactional, const 
> >>char *prefix, Size msg_sz,
> >>+ const char *msg)
> >>+{
> >>+   ReorderBufferTXN *txn = NULL;
> >>+
> >>+   if (transactional)
> >>+   {
> >>+   ReorderBufferChange *change;
> >>+
> >>+   txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> >>+
> >>+   Assert(xid != InvalidTransactionId);
> >>+   Assert(txn != NULL);
> >>+
> >>+   change = ReorderBufferGetChange(rb);
> >>+   change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> >>+   change->data.msg.transactional = true;
> >>+   change->data.msg.prefix = pstrdup(prefix);
> >>+   change->data.msg.message_size = msg_sz;
> >>+   change->data.msg.message = palloc(msg_sz);
> >>+   memcpy(change->data.msg.message, msg, msg_sz);
> >>+
> >>+   ReorderBufferQueueChange(rb, xid, lsn, change);
> >>+   }
> >>+   else
> >>+   {
> >>+   rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> >>+   }
> >>+}
> >
> >
> >This approach prohibts catalog access when processing a nontransaction
> >message as there's no snapshot set up.
> >
> 
> Hmm I do see usefulness in having snapshot, although I wonder if that does
> not kill the point of non-transactional messages.

I don't see how it would? It'd obviously have to be the catalog/historic
snapshot a transaction would have had if it started in that moment in
the original WAL stream?


> Question is then though which snapshot should the message see,
> base_snapshot of transaction?

Well, there'll not be a transaction, but something like snapbuild.c's
->snapshot ought to do the trick.


> That would mean we'd have to call SnapBuildProcessChange for
> non-transactional messages which we currently avoid. Alternatively we
> could probably invent lighter version of that interface that would
> just make sure builder->snapshot is valid and if not then build it

I think the latter is probably the direction we should go in.


> I am honestly sure if that's a win or not.

I think it'll be confusing (bug inducing) if there's no snapshot for
non-transactional messages but for transactional ones, and it'll
severely limit the usefulness of the interface.


Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-28 Thread Petr Jelinek

Hi,

thanks for looking Andres,


On 27/02/16 01:05, Andres Freund wrote:

Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?



Well, my reasoning there was to stop multiple plugins from using same 
prefix and for that you need some kind of registry. Making this a shared 
catalog seemed like huge overkill given the potentially transient nature 
of output plugins and this was the best I could come up with. And yes it 
requires you to load your plugin before you can log a message for it.


I am not married to this solution so if you have better ideas or if you 
think the clashes are not read issue, we can change it.




On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:

+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+ ?column?
+--
+ init
+(1 row)



+SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
+ ?column?
+--
+ msg1
+(1 row)


Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?


+  
+   
+
+ pg_logical_send_message
+
+pg_logical_send_message(transactional bool, 
prefix text, content 
text)
+   
+   
+void
+   
+   
+Write text logical decoding message. This can be used to pass generic
+messages to logical decoding plugins through WAL. The parameter
+transactional specifies if the message should
+be part of current transaction or if it should be written and decoded
+immediately. The prefix has to be prefix which
+was registered by a plugin. The content is
+content of the message.
+   
+  


It's not decoded immediately, even if emitted non-transactionally.



Okay, immediately is somewhat misleading. How does "should be written 
immediately and decoded as soon as logical decoding reads the WAL 
record" sound ?



+
+ Generic Message Callback
+
+ 
+  The optional message_cb callback is called whenever
+  a logical decoding message has been decoded.
+
+typedef void (*LogicalDecodeMessageCB) (
+struct LogicalDecodingContext *,
+ReorderBufferTXN *txn,
+XLogRecPtr message_lsn,
+bool transactional,
+const char *prefix,
+Size message_size,
+const char *message
+);


We should at least document what txn is set to if not transactional.



Will do (it's NULL).


+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+   char   *rec = XLogRecGetData(record);
+   xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+   appendStringInfo(buf, "%s message size %zu bytes",
+xlrec->transactional ? "transactional" : 
"nontransactional",
+xlrec->message_size);
+}


Shouldn't we check
   uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
   if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?



I thought it's kinda pointless, but we seem to be doing it in other 
places so will add.





+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const 
char *prefix, Size msg_sz,
+ const char *msg)
+{
+   ReorderBufferTXN *txn = NULL;
+
+   if (transactional)
+   {
+   ReorderBufferChange *change;
+
+   txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+   Assert(xid != InvalidTransactionId);
+   Assert(txn != NULL);
+
+   change = ReorderBufferGetChange(rb);
+   change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+   change->data.msg.transactional = true;
+   change->data.msg.prefix = pstrdup(prefix);
+   change->data.msg.message_size = msg_sz;
+   change->data.msg.message = palloc(msg_sz);
+   memcpy(change->data.msg.message, msg, msg_sz);
+
+   ReorderBufferQueueChange(rb, xid, lsn, change);
+   }
+   else
+   {
+   rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
+   }
+}



This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.



Hmm I do see usefulness in having snapshot, although I wonder if that 
does not kill the point of non-transactional messages. Question is then 
though which snapshot should the message see, base_snapshot of 
transaction? That would mean we'd have to call SnapBuildProcessChange 
for non-transactional messages which we currently avoid. Alternatively 
we could probably 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-27 Thread Artur Zakirov

Hello,

On 27.02.2016 03:05, Andres Freund wrote:

Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?



I have did some tests with a simple plugin. I have used event triggers 
to send messages. It works, but I agree with Andres. We have problems if 
plugin is not loaded. For example, if you will execute the query:


SELECT 'msg2' FROM pg_logical_send_message(false, 'test', 'msg2');

you will get the error (if plugin which should register a prefix is not 
loaded yet):


ERROR:  standby message prefix "test" is not registered

Some stylistic note (logical.c):


+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+  XLogRecPtr 
message_lsn,
+  bool transactional, 
const char *prefix,
+  Size sz, const char 
*message)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;


It should be written in the following way:

static void
message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
   XLogRecPtr message_lsn,
   bool transactional, const char *prefix,
   Size sz, const char *message)
{
LogicalDecodingContext *ctx = cache->private_data;
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;

--
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-26 Thread Andres Freund
Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?


On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
> +SET synchronous_commit = on;
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
> 'test_decoding');
> + ?column? 
> +--
> + init
> +(1 row)

> +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
> + ?column? 
> +--
> + msg1
> +(1 row)

Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?

> +  
> +   
> +
> + pg_logical_send_message
> +
> +
> pg_logical_send_message(transactional
>  bool, prefix text, 
> content text)
> +   
> +   
> +void
> +   
> +   
> +Write text logical decoding message. This can be used to pass generic
> +messages to logical decoding plugins through WAL. The parameter
> +transactional specifies if the message should
> +be part of current transaction or if it should be written and decoded
> +immediately. The prefix has to be prefix which
> +was registered by a plugin. The content is
> +content of the message.
> +   
> +  

It's not decoded immediately, even if emitted non-transactionally.

> +
> + Generic Message Callback
> +
> + 
> +  The optional message_cb callback is called 
> whenever
> +  a logical decoding message has been decoded.
> +
> +typedef void (*LogicalDecodeMessageCB) (
> +struct LogicalDecodingContext *,
> +ReorderBufferTXN *txn,
> +XLogRecPtr message_lsn,
> +bool transactional,
> +const char *prefix,
> +Size message_size,
> +const char *message
> +);

We should at least document what txn is set to if not transactional.

> +void
> +logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> +{
> + char   *rec = XLogRecGetData(record);
> + xl_logical_message *xlrec = (xl_logical_message *) rec;
> +
> + appendStringInfo(buf, "%s message size %zu bytes",
> +  xlrec->transactional ? "transactional" 
> : "nontransactional",
> +  xlrec->message_size);
> +}

Shouldn't we check
  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
  if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?

> +const char *
> +logicalmsg_identify(uint8 info)
> +{
> + return NULL;
> +}

Huh?


> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr 
> lsn,
> +   bool transactional, const 
> char *prefix, Size msg_sz,
> +   const char *msg)
> +{
> + ReorderBufferTXN *txn = NULL;
> +
> + if (transactional)
> + {
> + ReorderBufferChange *change;
> +
> + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> +
> + Assert(xid != InvalidTransactionId);
> + Assert(txn != NULL);
> +
> + change = ReorderBufferGetChange(rb);
> + change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> + change->data.msg.transactional = true;
> + change->data.msg.prefix = pstrdup(prefix);
> + change->data.msg.message_size = msg_sz;
> + change->data.msg.message = palloc(msg_sz);
> + memcpy(change->data.msg.message, msg, msg_sz);
> +
> + ReorderBufferQueueChange(rb, xid, lsn, change);
> + }
> + else
> + {
> + rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> + }
> +}


This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.

> + case REORDER_BUFFER_CHANGE_MESSAGE:
> + {
> + char   *data;
> + size_t  prefix_size = 
> strlen(change->data.msg.prefix) + 1;
> +
> + sz += prefix_size + 
> change->data.msg.message_size;
> + ReorderBufferSerializeReserve(rb, sz);
> +
> + data = ((char *) rb->outbuf) + 
> sizeof(ReorderBufferDiskChange);
> + memcpy(data, change->data.msg.prefix,
> +prefix_size);
> + memcpy(data + prefix_size, 
> change->data.msg.message,
> +change->data.msg.message_size);
> + break;
> + }
>   case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
>   {
>   Snapshotsnap;
> @@ -2354,6 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-24 Thread Petr Jelinek


Hello,

It seems that you forgot regression tests for test_decoding. There is an
entry in test_decoding/Makefile, but there are not files
sql/messages.sql and expected/messages.out. However they are included in
the first version of the patch.



Hi, yes, git add missing.

--
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 55a771c4770b73b5bedac7cd91bd0a50b0d6da45 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Wed, 24 Feb 2016 17:02:36 +0100
Subject: [PATCH] Logical Decoding Messages

---
 contrib/test_decoding/Makefile  |   2 +-
 contrib/test_decoding/expected/messages.out |  56 ++
 contrib/test_decoding/sql/messages.sql  |  17 +++
 contrib/test_decoding/test_decoding.c   |  22 +++-
 doc/src/sgml/func.sgml  |  42 
 doc/src/sgml/logicaldecoding.sgml   |  22 
 src/backend/access/rmgrdesc/Makefile|   4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c|  33 ++
 src/backend/access/transam/rmgr.c   |   1 +
 src/backend/replication/logical/Makefile|   2 +-
 src/backend/replication/logical/decode.c|  37 +++
 src/backend/replication/logical/logical.c   |  38 +++
 src/backend/replication/logical/logicalfuncs.c  |  27 +
 src/backend/replication/logical/message.c   | 132 
 src/backend/replication/logical/reorderbuffer.c |  73 +
 src/bin/pg_xlogdump/rmgrdesc.c  |   1 +
 src/include/access/rmgrlist.h   |   1 +
 src/include/catalog/pg_proc.h   |   4 +
 src/include/replication/logicalfuncs.h  |   2 +
 src/include/replication/message.h   |  42 
 src/include/replication/output_plugin.h |  13 +++
 src/include/replication/reorderbuffer.h |  21 
 22 files changed, 587 insertions(+), 5 deletions(-)
 create mode 100644 contrib/test_decoding/expected/messages.out
 create mode 100644 contrib/test_decoding/sql/messages.sql
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a362e69..8fdcfbc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin
+	binary prepared replorigin messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
new file mode 100644
index 000..d256851
--- /dev/null
+++ b/contrib/test_decoding/expected/messages.out
@@ -0,0 +1,56 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+--
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
+ ?column? 
+--
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_send_message(false, 'test', 'msg2');
+ ?column? 
+--
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_send_message(true, 'test', 'msg3');
+ ?column? 
+--
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_send_message(false, 'test', 'msg4');
+ ?column? 
+--
+ msg4
+(1 row)
+
+SELECT 'msg5' FROM pg_logical_send_message(true, 'test', 'msg5');
+ ?column? 
+--
+ msg5
+(1 row)
+
+COMMIT;
+SELECT regexp_replace(data, 'lsn: [^ ]+', '') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+   regexp_replace
+-
+ message:  transactional: 1 prefix: test, sz: 4 content:msg1
+ message:  transactional: 0 prefix: test, sz: 4 content:msg2
+ message:  transactional: 0 prefix: test, sz: 4 content:msg4
+ message:  transactional: 1 prefix: test, sz: 4 content:msg3
+ message:  transactional: 1 prefix: test, sz: 4 content:msg5
+(5 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+--
+ init
+(1 row)
+
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
new file mode 100644
index 000..7f462b8
--- /dev/null
+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,17 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_send_message(false, 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-20 Thread Artur Zakirov

On 23.01.2016 01:22, Petr Jelinek wrote:

Hi,

here is updated version of this patch, calling the messages logical
(decoding) messages consistently everywhere and removing any connection
to standby messages. Moving this to it's own module gave me place to
write some brief explanation about this so the code documentation has
hopefully improved as well.

The functionality itself didn't change.






Hello,

It seems that you forgot regression tests for test_decoding. There is an 
entry in test_decoding/Makefile, but there are not files 
sql/messages.sql and expected/messages.out. However they are included in 
the first version of the patch.


--
Artur Zakirov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-02-01 Thread Petr Jelinek
On 1 February 2016 at 09:45, Alexander Korotkov
 wrote:
> On Sat, Jan 30, 2016 at 11:58 AM, Simon Riggs  wrote:
>>
>> On 29 January 2016 at 21:11, Alexander Korotkov
>>  wrote:
>>>
>>> Should we think more about naming? Does two kinds of generic records
>>> confuse people?
>>
>>
>> Logical messages
>>
>> Generic WAL records
>>
>> Seems like I can tell them apart. Worth checking, but I think we're OK.
>
>
> I was worrying because topic name is "Generic WAL logical messages". But if
> we name them just "Logical messages" then it's OK for me.
>

Yeah the patch talks about logical messages, I use different title in
mailing list and CF to make it more clear on first sight what this
actually is technically.

-- 
  Petr Jelinek  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-01-30 Thread Simon Riggs
On 29 January 2016 at 21:11, Alexander Korotkov 
wrote:

> Hi, Petr!
>
> On Sat, Jan 23, 2016 at 1:22 AM, Petr Jelinek 
> wrote:
>
>> here is updated version of this patch, calling the messages logical
>> (decoding) messages consistently everywhere and removing any connection to
>> standby messages. Moving this to it's own module gave me place to write
>> some brief explanation about this so the code documentation has hopefully
>> improved as well.
>>
>> The functionality itself didn't change.
>
>
> I'd like to mention that there is my upcoming patch which is named generic
> WAL records.
> *http://www.postgresql.org/message-id/capphfdsxwzmojm6dx+tjnpyk27kt4o7ri6x_4oswcbyu1rm...@mail.gmail.com
> *
> But it has to be distinct feature from your generic WAL logical messages.
> Theoretically, we could have generic messages with arbitrary content and
> both having custom WAL reply function and being decoded by output plugin.
> But custom WAL reply function would let extension bug break recovery,
> archiving and physical replication. And that doesn't seem to be acceptable.
> This is why we have to develop these as separate features.
>
> Should we think more about naming? Does two kinds of generic records
> confuse people?
>

Logical messages

Generic WAL records

Seems like I can tell them apart. Worth checking, but I think we're OK.

-- 
Simon Riggshttp://www.2ndQuadrant.com/

PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-01-29 Thread Alexander Korotkov
Hi, Petr!

On Sat, Jan 23, 2016 at 1:22 AM, Petr Jelinek  wrote:

> here is updated version of this patch, calling the messages logical
> (decoding) messages consistently everywhere and removing any connection to
> standby messages. Moving this to it's own module gave me place to write
> some brief explanation about this so the code documentation has hopefully
> improved as well.
>
> The functionality itself didn't change.


I'd like to mention that there is my upcoming patch which is named generic
WAL records.
*http://www.postgresql.org/message-id/capphfdsxwzmojm6dx+tjnpyk27kt4o7ri6x_4oswcbyu1rm...@mail.gmail.com
*
But it has to be distinct feature from your generic WAL logical messages.
Theoretically, we could have generic messages with arbitrary content and
both having custom WAL reply function and being decoded by output plugin.
But custom WAL reply function would let extension bug break recovery,
archiving and physical replication. And that doesn't seem to be acceptable.
This is why we have to develop these as separate features.

Should we think more about naming? Does two kinds of generic records
confuse people?

--
Alexander Korotkov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-01-22 Thread Petr Jelinek

Hi,

here is updated version of this patch, calling the messages logical 
(decoding) messages consistently everywhere and removing any connection 
to standby messages. Moving this to it's own module gave me place to 
write some brief explanation about this so the code documentation has 
hopefully improved as well.


The functionality itself didn't change.

--
 Petr Jelinek  http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From c19d7274091baee4a52b1fa0ac684ace4cc9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek 
Date: Fri, 22 Jan 2016 20:15:22 +0100
Subject: [PATCH] Logical Messages

---
 contrib/test_decoding/Makefile  |   2 +-
 contrib/test_decoding/test_decoding.c   |  22 +++-
 doc/src/sgml/func.sgml  |  42 
 doc/src/sgml/logicaldecoding.sgml   |  22 
 src/backend/access/rmgrdesc/Makefile|   4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c|  33 ++
 src/backend/access/transam/rmgr.c   |   1 +
 src/backend/replication/logical/Makefile|   2 +-
 src/backend/replication/logical/decode.c|  37 +++
 src/backend/replication/logical/logical.c   |  38 +++
 src/backend/replication/logical/logicalfuncs.c  |  27 +
 src/backend/replication/logical/message.c   | 132 
 src/backend/replication/logical/reorderbuffer.c |  73 +
 src/bin/pg_xlogdump/rmgrdesc.c  |   1 +
 src/include/access/rmgrlist.h   |   1 +
 src/include/catalog/pg_proc.h   |   4 +
 src/include/replication/logicalfuncs.h  |   2 +
 src/include/replication/message.h   |  42 
 src/include/replication/output_plugin.h |  13 +++
 src/include/replication/reorderbuffer.h |  21 
 20 files changed, 514 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a362e69..8fdcfbc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin
+	binary prepared replorigin messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 4cf808f..f655355 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 
 #include "utils/builtins.h"
@@ -63,11 +64,15 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
  ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
  RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+			  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+			  bool transactional, const char *prefix,
+			  Size sz, const char *message);
 
 void
 _PG_init(void)
 {
-	/* other plugins can perform things here */
+	RegisterLogicalMsgPrefix("test");
 }
 
 /* specify output plugin callbacks */
@@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
+	cb->message_cb = pg_decode_message;
 }
 
 
@@ -471,3 +477,17 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+  ReorderBufferTXN *txn, XLogRecPtr lsn,
+  bool transactional, const char *prefix,
+  Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "message: lsn: %X/%X transactional: %d prefix: %s, sz: %zu content:",
+	 (uint32)(lsn >> 32), (uint32)lsn, transactional, prefix,
+	 sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4d2b88f..d55a2b1 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17639,6 +17639,48 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());

   
 
+  
+   
+
+ pg_logical_send_message
+
+pg_logical_send_message(transactional bool, prefix text, content text)
+   
+   
+void
+   
+   
+Write text logical decoding message. This can be 

Re: [HACKERS] Proposal: Generic WAL logical messages

2016-01-07 Thread Petr Jelinek

On 2016-01-07 17:22, Simon Riggs wrote:


* You call them "logical messages" here, but standby messages in code.
But they only apply to logical decoding, so "logical message" seems a
better name. Can we avoid calling them "messages" cos that will get
confusing.



Yes it's slightly confusing, the "Standby" in the code is mostly for 
consistency with other "Standby*" stuff in neighbouring code, but that 
can be changed. I don't have better name than "messages" though, 
"records" is too generic.



For standard WAL reply, these are basically noop


We should document that.


Okay.



These messages can be both transactional (decoded on commit) or
non-transactional (decoded immediately). Each message has prefix to
differentiate between individual plugins. The prefix has to be
registered exactly once (in similar manner as security label
providers) to avoid conflicts between plugins.


I'm not sure what "transactional" means, nor is that documented.
(Conversely, I think "immediate" fairly clear)

Are they fired only on commit? (Guess so)
Are they fired in the original order, if multiple messages in same
transaction? (Hope so)
Are they fired as they come in the original message sequence, or before
anything else or after everything else? For example, cache invalidation
messages are normally fired right at the end of a transaction, no matter
when they were triggered.



Transnational message is added to the stream same way as any DML 
operation is and has same properties (processed on commit, original 
order, duplicate messages are delivered as they are).


The immediate as you say is obvious, they get to logical decoding 
immediately without dealing with any regards to what's happening around 
(wal order is still preserved though).


Will make this clearer in the docs.

--
 Petr Jelinek  http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


Re: [HACKERS] Proposal: Generic WAL logical messages

2016-01-07 Thread Andres Freund
> Process-wise, I don't understand why is Andres mentioned as co-author.
> Did he actually wrote part of the patch, or advised on the design?
> Who is reviewing the patch?

It's extracted & extended from BDR, where I added that feature (to
implement distributed locking).


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers


[HACKERS] Proposal: Generic WAL logical messages

2015-12-31 Thread Petr Jelinek

Hello,

I would like to introduce concept of generic WAL logical messages.

These messages just create WAL record with arbitrary data as specified 
by user. For standard WAL reply, these are basically noop, but in 
logical decoding they are be decoded and the special callback of the 
output plugin is be called for them.


These messages can be both transactional (decoded on commit) or 
non-transactional (decoded immediately). Each message has prefix to 
differentiate between individual plugins. The prefix has to be 
registered exactly once (in similar manner as security label providers) 
to avoid conflicts between plugins.


There are three main use-cases for these:
a) reliable communication between two nodes in multi-master setup - WAL 
already handles correctly the recovery etc so we don't have to invent 
new complex code to handle crashes in middle of operations


b) out of order messaging in logical replication - if you want to send 
something to other node immediately without having to wait for commit 
for example to acquire remote lock, this can be used for that


c) "queue tables" - combination of this feature (there is SQL interface) 
and before triggers can be used to create tables for which all inserts 
only go to the WAL so they can behave like queue without having to store 
the data in the table itself (kind of the opposite of unlogged tables)


Initial implementation of this proposal is attached.

--
 Petr Jelinek  http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


WAL-Messages-2016-01-01.patch
Description: binary/octet-stream

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers