After manually applied the patch, a diff regenerated is attached.

On 2020-02-18 4:16 p.m., David Zhang wrote:
1. Tried to apply the patch to PG 12.2 commit 
45b88269a353ad93744772791feb6d01bc7e1e42 (HEAD -> REL_12_2, tag: REL_12_2), it 
doesn't work. Then tried to check the patch, and found the errors showing below.
$ git apply --check 
0001-Fastpath-for-sending-changes-to-output-plugin-in-log.patch
error: patch failed: contrib/test_decoding/logical.conf:1
error: contrib/test_decoding/logical.conf: patch does not apply
error: patch failed: src/backend/replication/logical/reorderbuffer.c:1133
error: src/backend/replication/logical/reorderbuffer.c: patch does not apply

2. Ran a further check for file "logical.conf", and found there is only one commit since 
2014, which doesn't have the parameter, "logical_decoding_work_mem = 64kB"

3. Manually apply the patch including 
src/backend/replication/logical/reorderbuffer.c, and then ran a simple logical 
replication test. A connection issue is found like below,
"table public.pgbench_accounts: INSERT: aid[integer]:4071 bid[integer]:1 
abalance[integer]:0 filler[character]:'                                              
                                      '
pg_recvlogical: error: could not receive data from WAL stream: server closed 
the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
pg_recvlogical: disconnected; waiting 5 seconds to try again"

4. This connection issue can be reproduced on PG 12.2 commit mentioned above, 
the basic steps,
4.1 Change "wal_level = logical" in "postgresql.conf"
4.2 create a logical slot and listen on it,
$ pg_recvlogical -d postgres --slot test --create-slot
$ pg_recvlogical -d postgres --slot test --start -f -

4.3 from another terminal, run the command below,
$ pgbench -i -p 5432 -d postgres

Let me know if I did something wrong, and if a new patch is available, I can 
re-run the test on the same environment.

--
David

Software Engineer
Highgo Software Inc. (Canada)
www.highgo.ca
diff --git a/contrib/test_decoding/logical.conf 
b/contrib/test_decoding/logical.conf
index 367f706651..02595d99d5 100644
--- a/contrib/test_decoding/logical.conf
+++ b/contrib/test_decoding/logical.conf
@@ -1,2 +1,3 @@
 wal_level = logical
 max_replication_slots = 4
+logical_decoding_work_mem = 64MB
diff --git a/src/backend/replication/logical/reorderbuffer.c 
b/src/backend/replication/logical/reorderbuffer.c
index a74fd705b4..62b661dc4d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -989,11 +989,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                        nr_txns++;
        }
 
-       /*
-        * TODO: Consider adding fastpath for the rather common nr_txns=1 case, 
no
-        * need to allocate/build a heap then.
-        */
-
        /* allocate iteration state */
        state = (ReorderBufferIterTXNState *)
                MemoryContextAllocZero(rb->context,
@@ -1009,10 +1004,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                state->entries[off].segno = 0;
        }
 
-       /* allocate heap */
-       state->heap = binaryheap_allocate(state->nr_txns,
-                                                                         
ReorderBufferIterCompare,
-                                                                         
state);
+       /* allocate heap, if we have more than one transaction. */
+       if (nr_txns > 1)
+               state->heap = binaryheap_allocate(state->nr_txns,
+                                                                               
  ReorderBufferIterCompare,
+                                                                               
  state);
 
        /* Now that the state fields are initialized, it is safe to return it. 
*/
        *iter_state = state;
@@ -1044,7 +1040,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                state->entries[off].change = cur_change;
                state->entries[off].txn = txn;
 
-               binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+               /* add to heap, only if we have more than one transaction. */
+               if (nr_txns > 1)
+                       binaryheap_add_unordered(state->heap, 
Int32GetDatum(off++));
        }
 
        /* add subtransactions if they contain changes */
@@ -1073,12 +1071,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, 
ReorderBufferTXN *txn,
                        state->entries[off].change = cur_change;
                        state->entries[off].txn = cur_txn;
 
-                       binaryheap_add_unordered(state->heap, 
Int32GetDatum(off++));
+                       /* add to heap, only if we have more than one 
transaction. */
+                       if (nr_txns > 1)
+                               binaryheap_add_unordered(state->heap, 
Int32GetDatum(off++));
                }
        }
 
        /* assemble a valid binary heap */
-       binaryheap_build(state->heap);
+       if (nr_txns > 1)
+               binaryheap_build(state->heap);
 }
 
 /*
@@ -1094,11 +1095,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, 
ReorderBufferIterTXNState *state)
        ReorderBufferIterTXNEntry *entry;
        int32           off;
 
-       /* nothing there anymore */
-       if (state->heap->bh_size == 0)
-               return NULL;
+       /*
+        * If there is only one transaction then it will be at the offset 0.
+        * Otherwise get the offset from the binary heap.
+        */
+       if (state->nr_txns == 1)
+       {
+               off = 0;
+               if (state->entries[off].change == NULL)
+                       return NULL;
+       }
+       else
+       {
+               /* nothing there anymore */
+               if (state->heap->bh_size == 0)
+                       return NULL;
+               off = DatumGetInt32(binaryheap_first(state->heap));
+       }
 
-       off = DatumGetInt32(binaryheap_first(state->heap));
        entry = &state->entries[off];
 
        /* free memory we might have "leaked" in the previous *Next call */
@@ -1128,7 +1142,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, 
ReorderBufferIterTXNState *state)
                state->entries[off].lsn = next_change->lsn;
                state->entries[off].change = next_change;
 
-               binaryheap_replace_first(state->heap, Int32GetDatum(off));
+               if (state->nr_txns > 1)
+                       binaryheap_replace_first(state->heap, 
Int32GetDatum(off));
+
                return change;
        }
 
@@ -1165,7 +1181,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, 
ReorderBufferIterTXNState *state)
        }
 
        /* ok, no changes there anymore, remove */
-       binaryheap_remove_first(state->heap);
+       if (state->nr_txns > 1)
+               binaryheap_remove_first(state->heap);
+       else
+               entry->change = NULL;
 
        return change;
 }
@@ -1196,7 +1215,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
                Assert(dlist_is_empty(&state->old_change));
        }
 
-       binaryheap_free(state->heap);
+       if (state->nr_txns > 1)
+               binaryheap_free(state->heap);
        pfree(state);
 }
 

Reply via email to