After manually applied the patch, a diff regenerated is attached.
On 2020-02-18 4:16 p.m., David Zhang wrote:
1. Tried to apply the patch to PG 12.2 commit 45b88269a353ad93744772791feb6d01bc7e1e42 (HEAD -> REL_12_2, tag: REL_12_2), it doesn't work. Then tried to check the patch, and found the errors showing below. $ git apply --check 0001-Fastpath-for-sending-changes-to-output-plugin-in-log.patch error: patch failed: contrib/test_decoding/logical.conf:1 error: contrib/test_decoding/logical.conf: patch does not apply error: patch failed: src/backend/replication/logical/reorderbuffer.c:1133 error: src/backend/replication/logical/reorderbuffer.c: patch does not apply 2. Ran a further check for file "logical.conf", and found there is only one commit since 2014, which doesn't have the parameter, "logical_decoding_work_mem = 64kB" 3. Manually apply the patch including src/backend/replication/logical/reorderbuffer.c, and then ran a simple logical replication test. A connection issue is found like below, "table public.pgbench_accounts: INSERT: aid[integer]:4071 bid[integer]:1 abalance[integer]:0 filler[character]:' ' pg_recvlogical: error: could not receive data from WAL stream: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. pg_recvlogical: disconnected; waiting 5 seconds to try again" 4. This connection issue can be reproduced on PG 12.2 commit mentioned above, the basic steps, 4.1 Change "wal_level = logical" in "postgresql.conf" 4.2 create a logical slot and listen on it, $ pg_recvlogical -d postgres --slot test --create-slot $ pg_recvlogical -d postgres --slot test --start -f - 4.3 from another terminal, run the command below, $ pgbench -i -p 5432 -d postgres Let me know if I did something wrong, and if a new patch is available, I can re-run the test on the same environment.
-- David Software Engineer Highgo Software Inc. (Canada) www.highgo.ca
diff --git a/contrib/test_decoding/logical.conf b/contrib/test_decoding/logical.conf index 367f706651..02595d99d5 100644 --- a/contrib/test_decoding/logical.conf +++ b/contrib/test_decoding/logical.conf @@ -1,2 +1,3 @@ wal_level = logical max_replication_slots = 4 +logical_decoding_work_mem = 64MB diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a74fd705b4..62b661dc4d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -989,11 +989,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, nr_txns++; } - /* - * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no - * need to allocate/build a heap then. - */ - /* allocate iteration state */ state = (ReorderBufferIterTXNState *) MemoryContextAllocZero(rb->context, @@ -1009,10 +1004,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].segno = 0; } - /* allocate heap */ - state->heap = binaryheap_allocate(state->nr_txns, - ReorderBufferIterCompare, - state); + /* allocate heap, if we have more than one transaction. */ + if (nr_txns > 1) + state->heap = binaryheap_allocate(state->nr_txns, + ReorderBufferIterCompare, + state); /* Now that the state fields are initialized, it is safe to return it. */ *iter_state = state; @@ -1044,7 +1040,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } /* add subtransactions if they contain changes */ @@ -1073,12 +1071,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + /* add to heap, only if we have more than one transaction. */ + if (nr_txns > 1) + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); } } /* assemble a valid binary heap */ - binaryheap_build(state->heap); + if (nr_txns > 1) + binaryheap_build(state->heap); } /* @@ -1094,11 +1095,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ReorderBufferIterTXNEntry *entry; int32 off; - /* nothing there anymore */ - if (state->heap->bh_size == 0) - return NULL; + /* + * If there is only one transaction then it will be at the offset 0. + * Otherwise get the offset from the binary heap. + */ + if (state->nr_txns == 1) + { + off = 0; + if (state->entries[off].change == NULL) + return NULL; + } + else + { + /* nothing there anymore */ + if (state->heap->bh_size == 0) + return NULL; + off = DatumGetInt32(binaryheap_first(state->heap)); + } - off = DatumGetInt32(binaryheap_first(state->heap)); entry = &state->entries[off]; /* free memory we might have "leaked" in the previous *Next call */ @@ -1128,7 +1142,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) state->entries[off].lsn = next_change->lsn; state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + if (state->nr_txns > 1) + binaryheap_replace_first(state->heap, Int32GetDatum(off)); + return change; } @@ -1165,7 +1181,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) } /* ok, no changes there anymore, remove */ - binaryheap_remove_first(state->heap); + if (state->nr_txns > 1) + binaryheap_remove_first(state->heap); + else + entry->change = NULL; return change; } @@ -1196,7 +1215,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, Assert(dlist_is_empty(&state->old_change)); } - binaryheap_free(state->heap); + if (state->nr_txns > 1) + binaryheap_free(state->heap); pfree(state); }