Hi,

On 2025-08-14 19:36:49 -0400, Andres Freund wrote:
> On 2025-08-14 17:55:53 -0400, Peter Geoghegan wrote:
> > On Thu, Aug 14, 2025 at 5:06 PM Peter Geoghegan <p...@bowt.ie> wrote:
> > > > We can optimize that by deferring the StartBufferIO() if we're 
> > > > encountering a
> > > > buffer that is undergoing IO, at the cost of some complexity.  I'm not 
> > > > sure
> > > > real-world queries will often encounter the pattern of the same block 
> > > > being
> > > > read in by a read stream multiple times in close proximity sufficiently 
> > > > often
> > > > to make that worth it.
> > >
> > > We definitely need to be prepared for duplicate prefetch requests in
> > > the context of index scans.
> >
> > Can you (or anybody else) think of a quick and dirty way of working
> > around the problem on the read stream side? I would like to prioritize
> > getting the patch into a state where its overall performance profile
> > "feels right". From there we can iterate on fixing the underlying
> > issues in more principled ways.
>
> I think I can see a way to fix the issue, below read stream. Basically,
> whenever AsyncReadBuffers() finds a buffer that has ongoing IO, instead of
> waiting, as we do today, copy the wref to the ReadBuffersOperation() and set a
> new flag indicating that we are waiting for an IO that was not started by the
> wref. Then, in WaitReadBuffers(), we wait for such foreign started IOs. That
> has to be somewhat different code from today, because we have to deal with the
> fact of the "foreign" IO potentially having failed.
>
> I'll try writing a prototype for that tomorrow. I think to actually get that
> into a committable shape we need a test harness (probably a read stream
> controlled by an SQL function that gets an array of buffers).

Attached is a prototype of this approach. It does seem to fix this issue.

New code disabled:

    #### backwards sequential table ####
    ┌──────────────────────────────────────────────────────────────────────┐
    │                              QUERY PLAN                              │
    ├──────────────────────────────────────────────────────────────────────┤
    │ Index Scan Backward using t_pk on t (actual rows=1048576.00 loops=1) │
    │   Index Cond: ((a >= 16336) AND (a <= 49103))                        │
    │   Index Searches: 1                                                  │
    │   Buffers: shared hit=10291 read=49933                               │
    │   I/O Timings: shared read=213.277                                   │
    │ Planning:                                                            │
    │   Buffers: shared hit=91 read=19                                     │
    │   I/O Timings: shared read=2.124                                     │
    │ Planning Time: 3.269 ms                                              │
    │ Execution Time: 1023.279 ms                                          │
    └──────────────────────────────────────────────────────────────────────┘
    (10 rows)


New code enabled:

    #### backwards sequential table ####
    ┌──────────────────────────────────────────────────────────────────────┐
    │                              QUERY PLAN                              │
    ├──────────────────────────────────────────────────────────────────────┤
    │ Index Scan Backward using t_pk on t (actual rows=1048576.00 loops=1) │
    │   Index Cond: ((a >= 16336) AND (a <= 49103))                        │
    │   Index Searches: 1                                                  │
    │   Buffers: shared hit=10291 read=49933                               │
    │   I/O Timings: shared read=217.225                                   │
    │ Planning:                                                            │
    │   Buffers: shared hit=91 read=19                                     │
    │   I/O Timings: shared read=2.009                                     │
    │ Planning Time: 2.685 ms                                              │
    │ Execution Time: 602.987 ms                                           │
    └──────────────────────────────────────────────────────────────────────┘
    (10 rows)


With the change enabled, the sequential query is faster than the random query:

    #### backwards random table ####
    
┌────────────────────────────────────────────────────────────────────────────────────────────┐
    │                                         QUERY PLAN                        
                 │
    
├────────────────────────────────────────────────────────────────────────────────────────────┤
    │ Index Scan Backward using t_randomized_pk on t_randomized (actual 
rows=1048576.00 loops=1) │
    │   Index Cond: ((a >= 16336) AND (a <= 49103))                             
                 │
    │   Index Searches: 1                                                       
                 │
    │   Buffers: shared hit=6085 read=77813                                     
                 │
    │   I/O Timings: shared read=347.285                                        
                 │
    │ Planning:                                                                 
                 │
    │   Buffers: shared hit=127 read=5                                          
                 │
    │   I/O Timings: shared read=1.001                                          
                 │
    │ Planning Time: 1.751 ms                                                   
                 │
    │ Execution Time: 820.544 ms                                                
                 │
    
└────────────────────────────────────────────────────────────────────────────────────────────┘
    (10 rows)



Greetings,

Andres Freund
>From 433e82c94fd1c1b502a2b22e9c3874c1e766c05c Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 15 Aug 2025 11:01:52 -0400
Subject: [PATCH v1] bufmgr: aio: Prototype for not waiting for
 already-in-progress IO

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |   1 +
 src/backend/storage/buffer/bufmgr.c | 133 ++++++++++++++++++++++++++--
 2 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..7ddb867bc99 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -137,6 +137,7 @@ struct ReadBuffersOperation
        int                     flags;
        int16           nblocks;
        int16           nblocks_done;
+       bool            foreign_io;
        PgAioWaitRef io_wref;
        PgAioReturn io_return;
 };
diff --git a/src/backend/storage/buffer/bufmgr.c 
b/src/backend/storage/buffer/bufmgr.c
index fd7e21d96d3..de755fd53ad 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1557,6 +1557,41 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
                return StartBufferIO(GetBufferDescriptor(buffer - 1), true, 
nowait);
 }
 
+static inline bool
+ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer)
+{
+       BufferDesc *desc;
+       uint32          buf_state;
+       PgAioWaitRef iow;
+
+       pgaio_wref_clear(&iow);
+
+       if (BufferIsLocal(buffer))
+       {
+               desc = GetLocalBufferDescriptor(-buffer - 1);
+               buf_state = pg_atomic_read_u32(&desc->state);
+               if (buf_state & BM_IO_IN_PROGRESS)
+                       iow = desc->io_wref;
+       }
+       else
+       {
+               desc = GetBufferDescriptor(buffer - 1);
+               buf_state = LockBufHdr(desc);
+
+               if (buf_state & BM_IO_IN_PROGRESS)
+                       iow = desc->io_wref;
+               UnlockBufHdr(desc, buf_state);
+       }
+
+       if (pgaio_wref_valid(&iow))
+       {
+               operation->io_wref = iow;
+               return true;
+       }
+
+       return false;
+}
+
 /*
  * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
  */
@@ -1689,7 +1724,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
                         *
                         * we first check if we already know the IO is complete.
                         */
-                       if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+                       if ((operation->foreign_io || aio_ret->result.status == 
PGAIO_RS_UNKNOWN) &&
                                !pgaio_wref_check_done(&operation->io_wref))
                        {
                                instr_time      io_start = 
pgstat_prepare_io_time(track_io_timing);
@@ -1708,11 +1743,38 @@ WaitReadBuffers(ReadBuffersOperation *operation)
                                
Assert(pgaio_wref_check_done(&operation->io_wref));
                        }
 
-                       /*
-                        * We now are sure the IO completed. Check the results. 
This
-                        * includes reporting on errors if there were any.
-                        */
-                       ProcessReadBuffersResult(operation);
+                       if (operation->foreign_io)
+                       {
+                               Buffer          buffer = 
operation->buffers[operation->nblocks_done];
+                               BufferDesc *desc;
+                               uint32          buf_state;
+
+                               if (BufferIsLocal(buffer))
+                               {
+                                       desc = GetLocalBufferDescriptor(-buffer 
- 1);
+                                       buf_state = 
pg_atomic_read_u32(&desc->state);
+                               }
+                               else
+                               {
+                                       desc = GetBufferDescriptor(buffer - 1);
+                                       buf_state = LockBufHdr(desc);
+                                       UnlockBufHdr(desc, buf_state);
+                               }
+
+                               if (buf_state & BM_VALID)
+                               {
+                                       operation->nblocks_done += 1;
+                                       Assert(operation->nblocks_done <= 
operation->nblocks);
+                               }
+                       }
+                       else
+                       {
+                               /*
+                                * We now are sure the IO completed. Check the 
results. This
+                                * includes reporting on errors if there were 
any.
+                                */
+                               ProcessReadBuffersResult(operation);
+                       }
                }
 
                /*
@@ -1798,6 +1860,56 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int 
*nblocks_progress)
                io_object = IOOBJECT_RELATION;
        }
 
+       /*
+        * If AIO is in progress, be it in this backend or another backend, we
+        * just associate the wait reference with the operation and wait in
+        * WaitReadBuffers(). This turns out to be important for performance in
+        * two workloads:
+        *
+        * 1) A read stream that has to read the same block multiple times 
within
+        *        the readahead distance. This can happen e.g. for the table 
accesses
+        *        of an index scan.
+        *
+        * 2) Concurrent scans by multiple backends on the same relation.
+        *
+        * If we were to synchronously wait for the in-progress IO, we'd not be
+        * able to keep enough I/O in flight.
+        *
+        *
+        * If we do find there is ongoing I/O for the buffer, we set up a 
1-block
+        * ReadBuffersOperation that WaitReadBuffers then can wait on.
+        */
+       if (1 && ReadBuffersIOAlreadyInProgress(operation, 
buffers[nblocks_done]))
+       {
+               /* FIXME: probably need to wait if io_method == sync? */
+
+               *nblocks_progress = 1;
+               did_start_io = true;
+               operation->foreign_io = true;
+
+               if (0)
+                       elog(LOG, "using foreign IO path");
+
+               /* FIXME: trace point */
+
+               /*
+                * FIXME: how should this be accounted for in stats? Account as 
a hit
+                * for now, quite likely *we* started this IO.
+                */
+               if (persistence == RELPERSISTENCE_TEMP)
+                       pgBufferUsage.local_blks_hit += 1;
+               else
+                       pgBufferUsage.shared_blks_hit += 1;
+
+               if (operation->rel)
+                       pgstat_count_buffer_hit(operation->rel);
+               pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+               if (VacuumCostActive)
+                       VacuumCostBalance += VacuumCostPageHit;
+
+               return true;
+       }
+
        /*
         * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
         * flag. The reason for that is that, hopefully, zero_damaged_pages 
isn't
@@ -1855,9 +1967,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int 
*nblocks_progress)
        /*
         * Check if we can start IO on the first to-be-read buffer.
         *
-        * If an I/O is already in progress in another backend, we want to wait
-        * for the outcome: either done, or something went wrong and we will
-        * retry.
+        * If a synchronous I/O in progress in another backend (it can't be this
+        * backend), we want to wait for the outcome: either done, or something
+        * went wrong and we will retry.
         */
        if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
        {
@@ -1970,6 +2082,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int 
*nblocks_progress)
                if (VacuumCostActive)
                        VacuumCostBalance += VacuumCostPageMiss * 
io_buffers_len;
 
+               operation->foreign_io = false;
                *nblocks_progress = io_buffers_len;
                did_start_io = true;
        }
@@ -5986,6 +6099,8 @@ WaitIO(BufferDesc *buf)
                 */
                if (pgaio_wref_valid(&iow))
                {
+                       if (0)
+                               elog(LOG, "foreign wait");
                        pgaio_wref_wait(&iow);
 
                        /*
-- 
2.48.1.76.g4e746b1a31.dirty

Reply via email to