Hi,

Here are some comments on the v21 patch.

1.
+                       WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

2.
+       pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, 
ReorderBufferTXN *txn,
                        Assert(false);
        }
 
+   if (in_streaming)
+       {
+               /* If streaming, send STREAM START if we haven't yet */
+               if (txndata && !txndata->sent_stream_start)
+               pgoutput_send_stream_start(ctx, txn);
+       }
+       else
+       {
+               /* If not streaming, send BEGIN if we haven't yet */
+               if (txndata && !txndata->sent_begin_txn)
+               pgoutput_send_begin(ctx, txn);
+       }
+
+
        /* Avoid leaking memory by using and resetting our own context */
        old = MemoryContextSwitchTo(data->context);


I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
                                                        ReorderBufferTXN *txn,
                                                        XLogRecPtr prepare_lsn)
 {
+       PGOutputTxnData *txndata = txn->output_plugin_private;
+       bool                    sent_begin_txn = txndata->sent_begin_txn;
+
        Assert(rbtxn_is_streamed(txn));
 
-       OutputPluginUpdateProgress(ctx);
+       pfree(txndata);
+       txn->output_plugin_private = NULL;
+
+       if (!sent_begin_txn)
+       {
+               elog(DEBUG1, "Skipping replication of an empty transaction in 
stream prepare");
+               return;
+       }
+
+       OutputPluginUpdateProgress(ctx, false);
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
        OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber 
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub 
with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR:  prepared transaction with identifier "pg_gid_16391_722" does not exist


Regards,
Shi yu

Reply via email to