On Tue, Aug 24, 2021 at 8:48 PM Robert Haas <robertmh...@gmail.com> wrote:
> On Fri, Aug 6, 2021 at 4:31 AM Dilip Kumar <dilipbal...@gmail.com> wrote: > > Results: (query EXPLAIN ANALYZE SELECT * FROM t;) > > 1) Non-parallel (default) > > Execution Time: 31627.492 ms > > > > 2) Parallel with 4 workers (force by setting parallel_tuple_cost to 0) > > Execution Time: 37498.672 ms > > > > 3) Same as above (2) but with the patch. > > Execution Time: 23649.287 ms > > This strikes me as an amazingly good result. I guess before seeing > these results, I would have said that you can't reasonably expect > parallel query to win on a query like this because there isn't enough > for the workers to do. It's not like they are spending time evaluating > filter conditions or anything like that - they're just fetching tuples > off of disk pages and sticking them into a queue. And it's unclear to > me why it should be better to have a bunch of processes doing that > instead of just one. I would have thought, looking at just (1) and > (2), that parallelism gained nothing and communication overhead lost 6 > seconds. > > But what this suggests is that parallelism gained at least 8 seconds, > and communication overhead lost at least 14 seconds. In fact... > Right, good observation. > > - If I apply both Experiment#1 and Experiment#2 patches together then, > > we can further reduce the execution time to 20963.539 ms (with 4 > > workers and 4MB tuple queue size) > > ...this suggests that parallelism actually gained at least 10-11 > seconds, and the communication overhead lost at least 15-16 seconds. > Yes > If that's accurate, it's pretty crazy. We might need to drastically > reduce the value of parallel_tuple_cost if these results hold up and > this patch gets committed. > In one of my experiments[Test1] I have noticed that even on the head the force parallel plan is significantly faster compared to the non-parallel plan, but with patch it is even better. The point is now also there might be some cases where the force parallel plans are faster but we are not sure whether we can reduce the parallel_tuple_cost or not. But with the patch it is definitely sure that the parallel tuple queue is faster compared to what we have now, So I agree we should consider reducing the parallel_tuple_cost after this patch. Additionally, I've done some more experiments with artificial workloads, as well as workloads where the parallel plan is selected by default, and in all cases I've seen a significant improvement. The gain is directly proportional to the load on the tuple queue, as expected. Test1: (Worker returns all tuples but only few tuples returns to the client) ---------------------------------------------------- INSERT INTO t SELECT i%10, repeat('a', 200) from generate_series(1,200000000) as i; set max_parallel_workers_per_gather=4; Target Query: SELECT random() FROM t GROUP BY a; Non-parallel (default plan): 77170.421 ms Parallel (parallel_tuple_cost=0): 53794.324 ms Parallel with patch (parallel_tuple_cost=0): 42567.850 ms 20% gain compared force parallel, 45% gain compared to default plan. Test2: (Parallel case with default parallel_tuple_cost) ---------------------------------------------- INSERT INTO t SELECT i, repeat('a', 200) from generate_series(1,200000000) as i; set max_parallel_workers_per_gather=4; SELECT * from t WHERE a < 17500000; Parallel(default plan): 23730.054 ms Parallel with patch (default plan): 21614.251 ms 8 to 10 % gain compared to the default parallel plan. I have done cleanup in the patch and I will add this to the September commitfest. I am planning to do further testing for identifying the optimal batch size in different workloads. WIth above workload I am seeing similar results with batch size 4k to 16k (1/4 of the ring size) so in the attached patch I have kept as 1/4 of the ring size. We might change that based on more analysis and testing. -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From 1142c46eaa3dcadc4bb28133ce873476ba3067c4 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Wed, 4 Aug 2021 16:51:01 +0530 Subject: [PATCH v1] Optimize parallel tuple send (shm_mq_send_bytes) Do not update shm_mq's mq_bytes_written until we have written an amount of data greater than 1/4th of the ring size. This will prevent frequent CPU cache misses, and it will also avoid frequent SetLatch() calls, which are quite expensive. --- src/backend/executor/tqueue.c | 2 +- src/backend/libpq/pqmq.c | 7 +++- src/backend/storage/ipc/shm_mq.c | 65 +++++++++++++++++++++++++++++------ src/include/storage/shm_mq.h | 8 +++-- src/test/modules/test_shm_mq/test.c | 7 ++-- src/test/modules/test_shm_mq/worker.c | 2 +- 6 files changed, 72 insertions(+), 19 deletions(-) diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 7af9fbe..eb0cbd7 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) /* Send the tuple itself. */ tuple = ExecFetchSlotMinimalTuple(slot, &should_free); - result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false); if (should_free) pfree(tuple); diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index d1a1f47..846494b 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len) for (;;) { - result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + /* + * Immediately notify the receiver by passing force_flush as true so + * that the shared memory value is updated before we send the parallel + * message signal right after this. + */ + result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true); if (pq_mq_parallel_leader_pid != 0) SendProcSignal(pq_mq_parallel_leader_pid, diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index 91a7093..3e1781c 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -120,6 +120,12 @@ struct shm_mq * message itself, and mqh_expected_bytes - which is used only for reads - * tracks the expected total size of the payload. * + * mqh_send_pending, is number of bytes that is written to the queue but not + * yet updated in the shared memory. We will not update it until the written + * data is 1/4th of the ring size or the tuple queue is full. This will + * prevent frequent CPU cache misses, and it will also avoid frequent + * SetLatch() calls, which are quite expensive. + * * mqh_counterparty_attached tracks whether we know the counterparty to have * attached to the queue at some previous point. This lets us avoid some * mutex acquisitions. @@ -139,6 +145,7 @@ struct shm_mq_handle Size mqh_consume_pending; Size mqh_partial_bytes; Size mqh_expected_bytes; + Size mqh_send_pending; bool mqh_length_word_complete; bool mqh_counterparty_attached; MemoryContext mqh_context; @@ -294,6 +301,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) mqh->mqh_consume_pending = 0; mqh->mqh_partial_bytes = 0; mqh->mqh_expected_bytes = 0; + mqh->mqh_send_pending = 0; mqh->mqh_length_word_complete = false; mqh->mqh_counterparty_attached = false; mqh->mqh_context = CurrentMemoryContext; @@ -317,16 +325,22 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) /* * Write a message into a shared message queue. + * + * When force_flush = true, we immediately update the shm_mq's mq_bytes_written + * and notify the receiver if it is already attached. Otherwise, we don't + * update it until we have written an amount of data greater than 1/4th of the + * ring size. */ shm_mq_result -shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) +shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait, + bool force_flush) { shm_mq_iovec iov; iov.data = data; iov.len = nbytes; - return shm_mq_sendv(mqh, &iov, 1, nowait); + return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush); } /* @@ -343,9 +357,12 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) * arguments, each time the process latch is set. (Once begun, the sending * of a message cannot be aborted except by detaching from the queue; changing * the length or payload will corrupt the queue.) + * + * For force_flush, refer comments atop shm_mq_send interface. */ shm_mq_result -shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) +shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait, + bool force_flush) { shm_mq_result res; shm_mq *mq = mqh->mqh_queue; @@ -518,8 +535,19 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) mqh->mqh_counterparty_attached = true; } - /* Notify receiver of the newly-written data, and return. */ - SetLatch(&receiver->procLatch); + /* + * If we have written more than 1/4 of the ring or the caller has + * requested force flush, mark it as written in shared memory and notify + * the receiver. For more detail refer comments atop shm_mq_handle + * structure. + */ + if (mqh->mqh_send_pending > mq->mq_ring_size / 4 || force_flush) + { + shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); + SetLatch(&receiver->procLatch); + mqh->mqh_send_pending = 0; + } + return SHM_MQ_SUCCESS; } @@ -816,6 +844,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) void shm_mq_detach(shm_mq_handle *mqh) { + /* Before detaching, notify already written data to the receiver. */ + if (mqh->mqh_send_pending > 0) + { + shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending); + mqh->mqh_send_pending = 0; + } + /* Notify counterparty that we're outta here. */ shm_mq_detach_internal(mqh->mqh_queue); @@ -894,7 +929,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, /* Compute number of ring buffer bytes used and available. */ rb = pg_atomic_read_u64(&mq->mq_bytes_read); - wb = pg_atomic_read_u64(&mq->mq_bytes_written); + wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending; Assert(wb >= rb); used = wb - rb; Assert(used <= ringsize); @@ -951,6 +986,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, } else if (available == 0) { + /* Update the pending send bytes in the shared memory. */ + shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending); + /* * Since mq->mqh_counterparty_attached is known to be true at this * point, mq_receiver has been set, and it can't change once set. @@ -959,6 +997,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, Assert(mqh->mqh_counterparty_attached); SetLatch(&mq->mq_receiver->procLatch); + /* + * We have just updated the mqh_send_pending bytes in the shared + * memory so reset it. + */ + mqh->mqh_send_pending = 0; + /* Skip manipulation of our latch if nowait = true. */ if (nowait) { @@ -1009,13 +1053,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, * MAXIMUM_ALIGNOF, and each read is as well. */ Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); - shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); /* - * For efficiency, we don't set the reader's latch here. We'll do - * that only when the buffer fills up or after writing an entire - * message. + * For efficiency, we don't update the bytes written in the shared + * memory and also don't set the reader's latch here. Refer to + * the comments atop the shm_mq_handle structure for more + * information. */ + mqh->mqh_send_pending += MAXALIGN(sendnow); } } diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index e693f3f..cb1c555 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); /* Send or receive messages. */ extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, - Size nbytes, const void *data, bool nowait); -extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, - shm_mq_iovec *iov, int iovcnt, bool nowait); + Size nbytes, const void *data, bool nowait, + bool force_flush); +extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, + int iovcnt, bool nowait, bool force_flush); extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait); +extern void shm_mq_flush(shm_mq_handle *mqh); /* Wait for our counterparty to attach to the queue. */ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c index 2d8d695..be074f0 100644 --- a/src/test/modules/test_shm_mq/test.c +++ b/src/test/modules/test_shm_mq/test.c @@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS) test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); /* Send the initial message. */ - res = shm_mq_send(outqh, message_size, message_contents, false); + res = shm_mq_send(outqh, message_size, message_contents, false, true); if (res != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS) break; /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); + res = shm_mq_send(outqh, len, data, false, true); if (res != SHM_MQ_SUCCESS) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS) */ if (send_count < loop_count) { - res = shm_mq_send(outqh, message_size, message_contents, true); + res = shm_mq_send(outqh, message_size, message_contents, true, + true); if (res == SHM_MQ_SUCCESS) { ++send_count; diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c index 2180776..9b037b9 100644 --- a/src/test/modules/test_shm_mq/worker.c +++ b/src/test/modules/test_shm_mq/worker.c @@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) break; /* Send it back out. */ - res = shm_mq_send(outqh, len, data, false); + res = shm_mq_send(outqh, len, data, false, true); if (res != SHM_MQ_SUCCESS) break; } -- 1.8.3.1