Dear hackers,

> It is correct that we can make a wrong decision about whether a change
> is transactional or non-transactional when sequence DDL happens before
> the SNAPBUILD_FULL_SNAPSHOT state and the sequence operation happens
> after that state.

I found a workload which decoder distinguish wrongly.

# Prerequisite

Apply an attached patch for inspecting the sequence status. It can be applied 
atop v20231203 patch set.
Also, a table and a sequence must be defined:

```
CREATE TABLE foo (var int);
CREATE SEQUENCE s;
```

# Workload

Then, you can execute concurrent transactions from three clients like below:

Client-1

BEGIN;
INSERT INTO foo VALUES (1);

                        Client-2

                        SELECT pg_create_logical_replication_slot('slot', 
'test_decoding');

                                                Client-3

                                                BEGIN;
                                                ALTER SEQUENCE s MAXVALUE 5000;
COMMIT;
                                                SAVEPOINT s1;
                                                SELECT setval('s', 2000);
                                                ROLLBACK;

                        SELECT pg_logical_slot_get_changes('slot', 
'test_decoding');

# Result and analysis

At first, below lines would be output on the log. This meant that WAL records
for ALTER SEQUENCE were decoded but skipped because the snapshot had been 
building.

```
...
LOG:  logical decoding found initial starting point at 0/154D238
DETAIL:  Waiting for transactions (approximately 1) older than 741 to end.
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: smgr_decode. snapshot is SNAPBUILD_BUILDING_SNAPSHOT
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: skipped
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: seq_decode. snapshot is SNAPBUILD_BUILDING_SNAPSHOT
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: skipped
...
```

Note that above `seq_decode...` line was not output via `setval()`, it was done
by ALTER SEQUENCE statement. Below is a call stack for inserting WAL.

```
XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
fill_seq_fork_with_data
fill_seq_with_data
AlterSequence
```

Then, subsequent lines would say like them. This means that the snapshot becomes
FULL and `setval()` is regarded non-transactional wrongly.

```
LOG:  logical decoding found initial consistent point at 0/154D658
DETAIL:  Waiting for transactions (approximately 1) older than 742 to end.
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: seq_decode. snapshot is SNAPBUILD_FULL_SNAPSHOT
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: the sequence is non-transactional
STATEMENT:  SELECT * FROM pg_create_logical_replication_slot('slot', 
'test_decoding');
LOG:  XXX: not consistent: skipped
```

The change would be discarded because the snapshot has not been CONSISTENT yet
by the below part. If it has been transactional, we would have queued this
change though the transaction will be skipped at commit.

```
        else if (!transactional &&
                         (SnapBuildCurrentState(builder) != 
SNAPBUILD_CONSISTENT ||
                          SnapBuildXactNeedsSkip(builder, buf->origptr)))
                return;
```

But anyway, we could find a case which we can make a wrong decision. This 
example
is lucky - does not output wrongly, but I'm not sure all the case like that.

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

diff --git a/src/backend/replication/logical/decode.c 
b/src/backend/replication/logical/decode.c
index d48d88081f..73e38cafd8 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -1397,12 +1397,17 @@ seq_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
 
+       elog(LOG, "XXX: seq_decode. snapshot is %s", 
SnapBuildIdentify(builder));
+
        /*
         * If we don't have snapshot or we are just fast-forwarding, there is no
         * point in decoding sequences.
         */
        if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+       {
+               elog(LOG, "XXX: skipped");
                return;
+       }
 
        /* only interested in our database */
        XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
@@ -1437,14 +1442,22 @@ seq_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
                                                                                
                                 target_locator,
                                                                                
                                 NULL);
 
+       elog(LOG, "XXX: the sequence is %s", transactional ? "transactional" : 
"non-transactional");
+
        /* Skip the change if already processed (per the snapshot). */
        if (transactional &&
                !SnapBuildProcessChange(builder, xid, buf->origptr))
+       {
+               elog(LOG, "XXX: already decoded: skipped");
                return;
+       }
        else if (!transactional &&
                         (SnapBuildCurrentState(builder) != 
SNAPBUILD_CONSISTENT ||
                          SnapBuildXactNeedsSkip(builder, buf->origptr)))
+       {
+               elog(LOG, "XXX: not consistent: skipped");
                return;
+       }
 
        /*
         * We also skip decoding in fast_forward mode. This check must be last
@@ -1527,13 +1540,18 @@ smgr_decode(LogicalDecodingContext *ctx, 
XLogRecordBuffer *buf)
 
        ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
 
+       elog(LOG, "XXX: smgr_decode. snapshot is %s", 
SnapBuildIdentify(builder));
+
        /*
         * If we don't have snapshot or we are just fast-forwarding, there is no
         * point in decoding relfilenode information.
         */
        if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
                ctx->fast_forward)
+       {
+               elog(LOG, "XXX: skipped");
                return;
+       }
 
        /* only interested in our database */
        xlrec = (xl_smgr_create *) XLogRecGetData(r);
diff --git a/src/backend/replication/logical/snapbuild.c 
b/src/backend/replication/logical/snapbuild.c
index fec190a8b2..efb2afb09d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2135,3 +2135,27 @@ CheckPointSnapBuild(void)
        }
        FreeDir(snap_dir);
 }
+
+char *
+SnapBuildIdentify(SnapBuild *builder)
+{
+       char *ret = NULL;
+
+       switch (builder->state)
+       {
+               case SNAPBUILD_START:
+                       ret = "SNAPBUILD_START";
+                       break;
+               case SNAPBUILD_BUILDING_SNAPSHOT:
+                       ret = "SNAPBUILD_BUILDING_SNAPSHOT";
+                       break;
+               case SNAPBUILD_FULL_SNAPSHOT:
+                       ret = "SNAPBUILD_FULL_SNAPSHOT";
+                       break;
+               case SNAPBUILD_CONSISTENT:
+                       ret = "SNAPBUILD_CONSISTENT";
+                       break;
+       }
+
+       return ret;
+}
diff --git a/src/include/replication/snapbuild.h 
b/src/include/replication/snapbuild.h
index 63f50a13d6..bf257f5f5a 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, 
XLogRecPtr lsn,
                                                                                
 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
+extern char *SnapBuildIdentify(SnapBuild *builder);
+
 #endif                                                 /* SNAPBUILD_H */

Reply via email to