On Tue, Dec 6, 2022 at 5:23 PM shiy.f...@fujitsu.com <shiy.f...@fujitsu.com> wrote: > > Hi hackers, > > In logical decoding, when logical_decoding_work_mem is exceeded, the changes > are > sent to output plugin in streaming mode. But there is a restriction that the > minimum value of logical_decoding_work_mem is 64kB. I tried to add a GUC to > allow sending every change to output plugin without waiting until > logical_decoding_work_mem is exceeded. >
Some review comments for patch v1-0001. 1. Typos In several places "Wheather/wheather" -> "Whether/whether" ====== src/backend/replication/logical/reorderbuffer.c 2. ReorderBufferCheckMemoryLimit { ReorderBufferTXN *txn; - /* bail out if we haven't exceeded the memory limit */ - if (rb->size < logical_decoding_work_mem * 1024L) + /* + * Stream the changes immediately if force_stream_mode is on and the output + * plugin supports streaming. Otherwise wait until size exceeds + * logical_decoding_work_mem. + */ + bool force_stream = (force_stream_mode && ReorderBufferCanStream(rb)); + + /* bail out if force_stream is false and we haven't exceeded the memory limit */ + if (!force_stream && rb->size < logical_decoding_work_mem * 1024L) return; /* - * Loop until we reach under the memory limit. One might think that just - * by evicting the largest (sub)transaction we will come under the memory - * limit based on assumption that the selected transaction is at least as - * large as the most recent change (which caused us to go over the memory - * limit). However, that is not true because a user can reduce the + * If force_stream is true, loop until there's no change. Otherwise, loop + * until we reach under the memory limit. One might think that just by + * evicting the largest (sub)transaction we will come under the memory limit + * based on assumption that the selected transaction is at least as large as + * the most recent change (which caused us to go over the memory limit). + * However, that is not true because a user can reduce the * logical_decoding_work_mem to a smaller value before the most recent * change. */ - while (rb->size >= logical_decoding_work_mem * 1024L) + while ((!force_stream && rb->size >= logical_decoding_work_mem * 1024L) || + (force_stream && rb->size > 0)) { /* * Pick the largest transaction (or subtransaction) and evict it from IIUC this logic can be simplified quite a lot just by shifting that "bail out" condition into the loop. Something like: while (true) { if (!(force_stream && rb->size > 0 || rb->size < logical_decoding_work_mem * 1024L)) break; ... } ------ Kind Regards, Peter Smith. Fujitsu Australia