I have checked the 0001 and 0003 patches. (I think we have dismissed
the approach in 0002 for now. And let's talk about 0004 later.)
I have attached a few fixup patches, described further below.
# 0001
The argument "create" for fill_seq_with_data() is always true (and
patch 0002 removes it). So I'm not sure if it's needed. If it is, it
should be documented somewhere.
About the comment you added in nextval_internal(): It's a good
explanation, so I would leave it in. I also agree with the
conclusion of the comment that the current solution is reasonable. We
probably don't need the same comment again in fill_seq_with_data() and
again in do_setval(). Note that both of the latter functions already
point to nextval_interval() for other comments, so the same can be
relied upon here as well.
The order of the new fields sequence_cb and stream_sequence_cb is a
bit inconsistent compared to truncate_cb and stream_truncate_cb.
Maybe take another look to make the order more uniform.
Some documentation needs to be added to the Logical Decoding chapter.
I have attached a patch that I think catches all the places that need
to be updated, with some details left for you to fill in. ;-) The
ordering of the some of the documentation chunks reflects the order in
which the callbacks appear in the header files, which might not be
optimal; see above.
In reorderbuffer.c, you left a comment about how to access a sequence
tuple. There is an easier way, using Form_pg_sequence_data, which is
how sequence.c also does it. See attached patch.
# 0003
The tests added in 0003 don't pass for me. It appears that you might
have forgotten to update the expected files after you added some tests
or something. The cfbot shows the same. See attached patch for a
correction, but do check what your intent was.
As mentioned before, we probably don't need the skip-sequences option
in test_decoding.
From 2dd9a0e76b9496016e7abebebd0006ddee72d3c1 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Tue, 7 Dec 2021 14:19:42 +0100
Subject: [PATCH 1/4] Whitespace fixes
---
contrib/test_decoding/expected/sequence.out | 6 +++---
contrib/test_decoding/sql/sequence.sql | 6 +++---
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/contrib/test_decoding/expected/sequence.out
b/contrib/test_decoding/expected/sequence.out
index 17c88990b1..ca55bcebc3 100644
--- a/contrib/test_decoding/expected/sequence.out
+++ b/contrib/test_decoding/expected/sequence.out
@@ -177,7 +177,7 @@ SELECT data FROM
pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
------
(0 rows)
--- rollback on table creation with serial column
+-- rollback on table creation with serial column
BEGIN;
CREATE TABLE test_table (a SERIAL, b INT);
INSERT INTO test_table (b) VALUES (100);
@@ -202,7 +202,7 @@ SELECT data FROM
pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
------
(0 rows)
--- rollback on table with serial column
+-- rollback on table with serial column
CREATE TABLE test_table (a SERIAL, b INT);
BEGIN;
INSERT INTO test_table (b) VALUES (100);
@@ -222,7 +222,7 @@ INSERT INTO test_table (b) VALUES (700);
INSERT INTO test_table (b) VALUES (800);
INSERT INTO test_table (b) VALUES (900);
ROLLBACK;
--- check table and sequence values after rollback
+-- check table and sequence values after rollback
SELECT * from test_table_a_seq;
last_value | log_cnt | is_called
------------+---------+-----------
diff --git a/contrib/test_decoding/sql/sequence.sql
b/contrib/test_decoding/sql/sequence.sql
index d8a34738f3..21c4b79222 100644
--- a/contrib/test_decoding/sql/sequence.sql
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -48,7 +48,7 @@ CREATE TABLE test_table (a INT);
ROLLBACK;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
--- rollback on table creation with serial column
+-- rollback on table creation with serial column
BEGIN;
CREATE TABLE test_table (a SERIAL, b INT);
INSERT INTO test_table (b) VALUES (100);
@@ -65,7 +65,7 @@ CREATE TABLE test_table (a SERIAL, b INT);
ROLLBACK;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '1', 'skip-sequences', '0');
--- rollback on table with serial column
+-- rollback on table with serial column
CREATE TABLE test_table (a SERIAL, b INT);
BEGIN;
@@ -82,7 +82,7 @@ CREATE TABLE test_table (a SERIAL, b INT);
INSERT INTO test_table (b) VALUES (900);
ROLLBACK;
--- check table and sequence values after rollback
+-- check table and sequence values after rollback
SELECT * from test_table_a_seq;
SELECT nextval('test_table_a_seq');
--
2.34.1
From c5e9eb2792bd276e493aed582627e93b4c681374 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Tue, 7 Dec 2021 15:29:34 +0100
Subject: [PATCH 2/4] Make tests pass
---
contrib/test_decoding/expected/sequence.out | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/contrib/test_decoding/expected/sequence.out
b/contrib/test_decoding/expected/sequence.out
index ca55bcebc3..d94e185fb9 100644
--- a/contrib/test_decoding/expected/sequence.out
+++ b/contrib/test_decoding/expected/sequence.out
@@ -260,13 +260,15 @@ ROLLBACK TO SAVEPOINT a;
DROP TABLE test_table;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
- data
---------------------------------------------------------------
+ data
+-------------------------------------------------------------------------------------------------
BEGIN
+ sequence public.test_table_a_seq: transactional:1 created:1 last_value:1
log_cnt:0 is_called:0
+ sequence public.test_table_a_seq: transactional:1 created:0 last_value:33
log_cnt:0 is_called:1
table public.test_table: INSERT: a[integer]:1 b[integer]:100
table public.test_table: INSERT: a[integer]:2 b[integer]:200
COMMIT
-(4 rows)
+(6 rows)
-- savepoint test on table with serial column
BEGIN;
@@ -307,11 +309,15 @@ SELECT * FROM test_sequence;
DROP SEQUENCE test_sequence;
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL,
'include-xids', '0', 'skip-empty-xacts', '0', 'skip-sequences', '0');
- data
---------
+ data
+------------------------------------------------------------------------------------------------
BEGIN
+ sequence public.test_sequence: transactional:1 created:1 last_value:1
log_cnt:0 is_called:0
+ sequence public.test_sequence: transactional:1 created:0 last_value:33
log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3000
log_cnt:0 is_called:1
+ sequence public.test_sequence: transactional:1 created:0 last_value:3033
log_cnt:0 is_called:1
COMMIT
-(2 rows)
+(6 rows)
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
--
2.34.1
From 0338de62cd955b98244b5b624a2bcb7af76465b0 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Tue, 7 Dec 2021 15:29:51 +0100
Subject: [PATCH 3/4] Some documentation updates
---
doc/src/sgml/logicaldecoding.sgml | 54 +++++++++++++++++++++++++++++--
1 file changed, 51 insertions(+), 3 deletions(-)
diff --git a/doc/src/sgml/logicaldecoding.sgml
b/doc/src/sgml/logicaldecoding.sgml
index b6353c7a12..eea4f3cd50 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -458,6 +458,7 @@ <title>Initialization Function</title>
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
@@ -472,6 +473,7 @@ <title>Initialization Function</title>
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamSequenceCB stream_sequence_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
@@ -481,9 +483,12 @@ <title>Initialization Function</title>
and <function>commit_cb</function> callbacks are required,
while <function>startup_cb</function>,
<function>filter_by_origin_cb</function>,
<function>truncate_cb</function>,
+ <function>sequence_cb</function>,
and <function>shutdown_cb</function> are optional.
If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
+ Similarly, if <function>sequence_cb</function> is not set and a sequence
+ change is to be decoded, the action will be ignored.
</para>
<para>
@@ -492,7 +497,7 @@ <title>Initialization Function</title>
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
<function>stream_commit_cb</function>,
<function>stream_change_cb</function>,
and <function>stream_prepare_cb</function>
- are required, while <function>stream_message_cb</function> and
+ are required, while <function>stream_message_cb</function>,
<function>stream_sequence_cb</function>, and
<function>stream_truncate_cb</function> are optional.
</para>
@@ -808,6 +813,27 @@ <title>Generic Message Callback</title>
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-sequence">
+ <title>Sequence Callback</title>
+
+ <para>
+ The <function>sequence_cb</function> callback is called for actions that
+ change a sequence.
+<programlisting>
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+</programlisting>
+ TODO
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-filter-prepare">
<title>Prepare Filter Callback</title>
@@ -1017,6 +1043,28 @@ <title>Stream Message Callback</title>
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-sequence">
+ <title>Stream Sequence Callback</title>
+ <para>
+ The <function>stream_sequence_cb</function> callback is called for
+ actions that change a sequence in a block of streamed changes
+ (demarcated by <function>stream_start_cb</function> and
+ <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext
*ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ bool created,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+</programlisting>
+ TODO
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-truncate">
<title>Stream Truncate Callback</title>
<para>
@@ -1197,8 +1245,8 @@ <title>Streaming of Large Transactions for Logical
Decoding</title>
in-progress transactions. There are multiple required streaming callbacks
(<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
- and <function>stream_change_cb</function>) and two optional callbacks
- (<function>stream_message_cb</function> and
<function>stream_truncate_cb</function>).
+ and <function>stream_change_cb</function>) and multiple optional callbacks
+ (<function>stream_message_cb</function>,
<function>stream_sequence_cb</function>, and
<function>stream_truncate_cb</function>).
Also, if streaming of two-phase commands is to be supported, then
additional
callbacks must be provided. (See <xref
linkend="logicaldecoding-two-phase-commits"/>
for details).
--
2.34.1
From 9f29fac7e479c255fb247d201fe8691655c11fbf Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pe...@eisentraut.org>
Date: Tue, 7 Dec 2021 15:31:36 +0100
Subject: [PATCH 4/4] Simplify accessing sequence tuple data
---
.../replication/logical/reorderbuffer.c | 37 ++++---------------
1 file changed, 8 insertions(+), 29 deletions(-)
diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index 434926459f..b4fa16bebe 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -120,6 +120,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
+#include "commands/sequence.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1089,9 +1090,7 @@ ReorderBufferQueueSequence(ReorderBuffer *rb,
TransactionId xid,
{
Relation relation;
HeapTuple tuple;
- bool isnull;
- int64 last_value, log_cnt;
- bool is_called;
+ Form_pg_sequence_data seq;
Oid reloid;
if (using_subtxn)
@@ -1108,19 +1107,10 @@ ReorderBufferQueueSequence(ReorderBuffer *rb,
TransactionId xid,
relation = RelationIdGetRelation(reloid);
tuple = &tuplebuf->tuple;
-
- /*
- * Extract the internal sequence values, describing the
state.
- *
- * XXX Seems a bit strange to access it directly. Maybe
there's
- * a better / more correct way?
- */
- last_value = heap_getattr(tuple, 1,
RelationGetDescr(relation), &isnull);
- log_cnt = heap_getattr(tuple, 2,
RelationGetDescr(relation), &isnull);
- is_called = heap_getattr(tuple, 3,
RelationGetDescr(relation), &isnull);
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
rb->sequence(rb, txn, lsn, relation, transactional,
created,
- last_value, log_cnt,
is_called);
+ seq->last_value, seq->log_cnt,
seq->is_called);
RelationClose(relation);
@@ -2249,31 +2239,20 @@ ReorderBufferApplySequence(ReorderBuffer *rb,
ReorderBufferTXN *txn,
bool streaming)
{
HeapTuple tuple;
- bool isnull;
- int64 last_value, log_cnt;
- bool is_called;
+ Form_pg_sequence_data seq;
tuple = &change->data.sequence.tuple->tuple;
-
- /*
- * Extract the internal sequence values, describing the state.
- *
- * XXX Seems a bit strange to access it directly. Maybe there's
- * a better / more correct way?
- */
- last_value = heap_getattr(tuple, 1, RelationGetDescr(relation),
&isnull);
- log_cnt = heap_getattr(tuple, 2, RelationGetDescr(relation), &isnull);
- is_called = heap_getattr(tuple, 3, RelationGetDescr(relation), &isnull);
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
/* Only ever called from ReorderBufferApplySequence, so transational. */
if (streaming)
rb->stream_sequence(rb, txn, change->lsn, relation, true,
change->data.sequence.created,
- last_value, log_cnt,
is_called);
+ seq->last_value,
seq->log_cnt, seq->is_called);
else
rb->sequence(rb, txn, change->lsn, relation, true,
change->data.sequence.created,
- last_value, log_cnt, is_called);
+ seq->last_value, seq->log_cnt,
seq->is_called);
}
/*
--
2.34.1