After manually applied the patch, a diff regenerated is attached.
On 2020-02-18 4:16 p.m., David Zhang wrote:
1. Tried to apply the patch to PG 12.2 commit
45b88269a353ad93744772791feb6d01bc7e1e42 (HEAD -> REL_12_2, tag: REL_12_2), it
doesn't work. Then tried to check the patch, and found the errors showing below.
$ git apply --check
0001-Fastpath-for-sending-changes-to-output-plugin-in-log.patch
error: patch failed: contrib/test_decoding/logical.conf:1
error: contrib/test_decoding/logical.conf: patch does not apply
error: patch failed: src/backend/replication/logical/reorderbuffer.c:1133
error: src/backend/replication/logical/reorderbuffer.c: patch does not apply
2. Ran a further check for file "logical.conf", and found there is only one commit since
2014, which doesn't have the parameter, "logical_decoding_work_mem = 64kB"
3. Manually apply the patch including
src/backend/replication/logical/reorderbuffer.c, and then ran a simple logical
replication test. A connection issue is found like below,
"table public.pgbench_accounts: INSERT: aid[integer]:4071 bid[integer]:1
abalance[integer]:0 filler[character]:'
'
pg_recvlogical: error: could not receive data from WAL stream: server closed
the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
pg_recvlogical: disconnected; waiting 5 seconds to try again"
4. This connection issue can be reproduced on PG 12.2 commit mentioned above,
the basic steps,
4.1 Change "wal_level = logical" in "postgresql.conf"
4.2 create a logical slot and listen on it,
$ pg_recvlogical -d postgres --slot test --create-slot
$ pg_recvlogical -d postgres --slot test --start -f -
4.3 from another terminal, run the command below,
$ pgbench -i -p 5432 -d postgres
Let me know if I did something wrong, and if a new patch is available, I can
re-run the test on the same environment.
--
David
Software Engineer
Highgo Software Inc. (Canada)
www.highgo.ca
diff --git a/contrib/test_decoding/logical.conf
b/contrib/test_decoding/logical.conf
index 367f706651..02595d99d5 100644
--- a/contrib/test_decoding/logical.conf
+++ b/contrib/test_decoding/logical.conf
@@ -1,2 +1,3 @@
wal_level = logical
max_replication_slots = 4
+logical_decoding_work_mem = 64MB
diff --git a/src/backend/replication/logical/reorderbuffer.c
b/src/backend/replication/logical/reorderbuffer.c
index a74fd705b4..62b661dc4d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -989,11 +989,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb,
ReorderBufferTXN *txn,
nr_txns++;
}
- /*
- * TODO: Consider adding fastpath for the rather common nr_txns=1 case,
no
- * need to allocate/build a heap then.
- */
-
/* allocate iteration state */
state = (ReorderBufferIterTXNState *)
MemoryContextAllocZero(rb->context,
@@ -1009,10 +1004,11 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb,
ReorderBufferTXN *txn,
state->entries[off].segno = 0;
}
- /* allocate heap */
- state->heap = binaryheap_allocate(state->nr_txns,
-
ReorderBufferIterCompare,
-
state);
+ /* allocate heap, if we have more than one transaction. */
+ if (nr_txns > 1)
+ state->heap = binaryheap_allocate(state->nr_txns,
+
ReorderBufferIterCompare,
+
state);
/* Now that the state fields are initialized, it is safe to return it.
*/
*iter_state = state;
@@ -1044,7 +1040,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb,
ReorderBufferTXN *txn,
state->entries[off].change = cur_change;
state->entries[off].txn = txn;
- binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+ /* add to heap, only if we have more than one transaction. */
+ if (nr_txns > 1)
+ binaryheap_add_unordered(state->heap,
Int32GetDatum(off++));
}
/* add subtransactions if they contain changes */
@@ -1073,12 +1071,15 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb,
ReorderBufferTXN *txn,
state->entries[off].change = cur_change;
state->entries[off].txn = cur_txn;
- binaryheap_add_unordered(state->heap,
Int32GetDatum(off++));
+ /* add to heap, only if we have more than one
transaction. */
+ if (nr_txns > 1)
+ binaryheap_add_unordered(state->heap,
Int32GetDatum(off++));
}
}
/* assemble a valid binary heap */
- binaryheap_build(state->heap);
+ if (nr_txns > 1)
+ binaryheap_build(state->heap);
}
/*
@@ -1094,11 +1095,24 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
ReorderBufferIterTXNState *state)
ReorderBufferIterTXNEntry *entry;
int32 off;
- /* nothing there anymore */
- if (state->heap->bh_size == 0)
- return NULL;
+ /*
+ * If there is only one transaction then it will be at the offset 0.
+ * Otherwise get the offset from the binary heap.
+ */
+ if (state->nr_txns == 1)
+ {
+ off = 0;
+ if (state->entries[off].change == NULL)
+ return NULL;
+ }
+ else
+ {
+ /* nothing there anymore */
+ if (state->heap->bh_size == 0)
+ return NULL;
+ off = DatumGetInt32(binaryheap_first(state->heap));
+ }
- off = DatumGetInt32(binaryheap_first(state->heap));
entry = &state->entries[off];
/* free memory we might have "leaked" in the previous *Next call */
@@ -1128,7 +1142,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
ReorderBufferIterTXNState *state)
state->entries[off].lsn = next_change->lsn;
state->entries[off].change = next_change;
- binaryheap_replace_first(state->heap, Int32GetDatum(off));
+ if (state->nr_txns > 1)
+ binaryheap_replace_first(state->heap,
Int32GetDatum(off));
+
return change;
}
@@ -1165,7 +1181,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb,
ReorderBufferIterTXNState *state)
}
/* ok, no changes there anymore, remove */
- binaryheap_remove_first(state->heap);
+ if (state->nr_txns > 1)
+ binaryheap_remove_first(state->heap);
+ else
+ entry->change = NULL;
return change;
}
@@ -1196,7 +1215,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
Assert(dlist_is_empty(&state->old_change));
}
- binaryheap_free(state->heap);
+ if (state->nr_txns > 1)
+ binaryheap_free(state->heap);
pfree(state);
}