I had an idea that might improve parallel seqscans on the same relation. If you have lots of concurrent seqscans going on a large relation, the cache hit ratio is very low. But, if the seqscans are concurrent on the same relation, there may be something to gain by starting a seqscan near the page being accessed by an already-in-progress seqscan, and wrapping back around to that start location. That would make some use of the shared buffers, which would otherwise just be cache pollution.
I made a proof-of-concept implementation, which is entirely in heapam.c, except for one addition to the HeapScanDesc struct in relscan.h. It is not at all up to production quality; there are things I know that need to be addressed. Basically, I just modified heapam.c to be able to start at any page in the relation. Then, every time it reads a new page, I have it mark the relation's oid and the page number in a shared mem segment. Everytime a new scan is started, it reads the shared mem segment, and if the relation's oid matches, it starts the scan at the page number it found in the shared memory. Otherwise, it starts the scan at 0. There are a couple obvious issues, one is that my whole implementation doesn't account for reverse scans at all (since initscan doesn't know what direction the scan will move in), but that shouldn't be a major problem since at worst it will be the current behavior (aside: can someone tell me how to force reverse scans so I can test that better?). Another is that there's a race condition with the shared mem, and that's out of pure laziness on my part. This method is really only effective at all if there is a significant amount of disk i/o. If it's pulling the data from O/S buffers the various scans will diverge too much and not be using eachother's shared buffers. I tested with shared_buffers=500 and all stats on. I used 60 threads performing 30 seqscans each in my script ssf.rb (I refer to my modification as "sequential scan follower" or ssf). Here are some results with my modifications: $ time ./ssf.rb # my script real 4m22.476s user 0m0.389s sys 0m0.186s test=# select relpages from pg_class where relname='test_ssf'; relpages ---------- 1667 (1 row) test=# select count(*) from test_ssf; count -------- 200000 (1 row) test=# select pg_stat_get_blocks_hit(17232) as hit, pg_stat_get_blocks_fetched(17232) as total; hit | total --------+--------- 971503 | 3353963 (1 row) Or, approx. 29% cache hit. Here are the results without my modifications: test=# select relpages from pg_class where relname='test_ssf'; relpages ---------- 1667 (1 row) test=# select count(*) from test_ssf; count -------- 200000 (1 row) test=# select pg_stat_get_blocks_hit(17231) as hit, pg_stat_get_blocks_fetched(17231) as total; hit | total --------+--------- 199999 | 3353963 (1 row) Or, approx. 6% cache hit. Note: the oid is different, because I have two seperately initdb'd data directories, one for the modified version, one for the unmodified 8.0.0. This is the first time I've really modified the PG source code to do anything that looked promising, so this is more of a question than anything else. Is it promising? Is this a potentially good approach? I'm happy to post more test data and more documentation, and I'd also be happy to bring the code to production quality. However, before I spend too much more time on that, I'd like to get a general response from a 3rd party to let me know if I'm off base. Regards, Jeff Davis
--- postgresql-8.0.0/src/backend/access/heap/heapam.c 2004-12-31 13:59:16.000000000 -0800 +++ postgresql-8.0.0-ssf/src/backend/access/heap/heapam.c 2005-02-24 20:37:24.596626668 -0800 @@ -65,6 +65,50 @@ * ---------------------------------------------------------------- */ + +#include <sys/shm.h> +#include <sys/types.h> +#include <sys/ipc.h> +/* + Retrieves a location of an in-progress seqscan on relid + and returns the page location +*/ +static BlockNumber get_page_loc(Oid relid) { + int shmid; + char *shm; + Oid shmrelid; + BlockNumber loc; + shmid = shmget(0x11aa55cc,(sizeof(Oid)+sizeof(BlockNumber)), + 0666|IPC_CREAT); + shm = shmat(shmid,NULL,0); + shmrelid = *((Oid*)shm); + loc = *((BlockNumber*)(shm+sizeof(Oid))); + shmdt(shm); + /*elog(NOTICE,"Getting shm: %u %u",shmrelid,loc);*/ + if (shmrelid == relid) + return loc; + else + return 0; +} + +/* + Places (relid,loc) in a shared mem structure +*/ +static int report_page_loc(Oid relid, BlockNumber loc) +{ + int shmid; + char *shm; + shmid = shmget(0x11aa55cc,(sizeof(Oid)+sizeof(BlockNumber)), + 0666|IPC_CREAT); + shm = shmat(shmid,NULL,0); + + /*elog(NOTICE,"Setting shm: %u %u",relid,loc);*/ + *((Oid*)shm) = relid; + *((BlockNumber*)(shm+sizeof(Oid))) = loc; + shmdt(shm); + return 0; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -85,6 +129,18 @@ scan->rs_ctup.t_data = NULL; scan->rs_cbuf = InvalidBuffer; + /* initialize start page */ + scan->rs_start_page = get_page_loc(RelationGetRelid(scan->rs_rd)); + /* + if(scan->rs_nblocks) + scan->rs_start_page = 5 % scan->rs_nblocks; + else + scan->rs_start_page = 0; + */ + /*elog(NOTICE,"Scan started on relid: %u, pages: %d, page: %d", + RelationGetRelid(scan->rs_rd), + scan->rs_nblocks, + scan->rs_start_page);*/ /* we don't have a marked position... */ ItemPointerSetInvalid(&(scan->rs_mctid)); @@ -115,261 +171,280 @@ Snapshot snapshot, int nkeys, ScanKey key, - BlockNumber pages) + BlockNumber pages, + HeapScanDesc scan) { - ItemId lpp; - Page dp; - BlockNumber page; - int lines; - OffsetNumber lineoff; - int linesleft; - ItemPointer tid; - - tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self); - - /* - * debugging stuff - * - * check validity of arguments, here and for other functions too Note: no - * locking manipulations needed--this is a local function - */ + ItemId lpp; + Page dp; + BlockNumber page; + int lines; + OffsetNumber lineoff; + int linesleft; + ItemPointer tid; + + tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self); + + /* + * debugging stuff + * + * check validity of arguments, here and for other functions too Note: no + * locking manipulations needed--this is a local function + */ #ifdef HEAPDEBUGALL - if (ItemPointerIsValid(tid)) - elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)", - RelationGetRelationName(relation), tid, tid->ip_blkid, - tid->ip_posid, dir); - else - elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)", - RelationGetRelationName(relation), tid, dir); - - elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key); - - elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p", - relation->rd_rel->relkind, RelationGetRelationName(relation), - snapshot); + if (ItemPointerIsValid(tid)) + elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)", + RelationGetRelationName(relation), tid, tid->ip_blkid, + tid->ip_posid, dir); + else + elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)", + RelationGetRelationName(relation), tid, dir); + + elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key); + + elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p", + relation->rd_rel->relkind, RelationGetRelationName(relation), + snapshot); #endif /* HEAPDEBUGALL */ - if (!ItemPointerIsValid(tid)) - { - Assert(!PointerIsValid(tid)); - tid = NULL; - } - - tuple->t_tableOid = relation->rd_id; - - /* - * return null immediately if relation is empty - */ - if (pages == 0) - { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; - tuple->t_datamcxt = NULL; - tuple->t_data = NULL; - return; - } - - /* - * calculate next starting lineoff, given scan direction - */ - if (dir == 0) - { - /* - * ``no movement'' scan direction: refetch same tuple - */ - if (tid == NULL) - { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; - tuple->t_datamcxt = NULL; - tuple->t_data = NULL; - return; - } - - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - ItemPointerGetBlockNumber(tid)); - - LockBuffer(*buffer, BUFFER_LOCK_SHARE); - - dp = (Page) BufferGetPage(*buffer); - lineoff = ItemPointerGetOffsetNumber(tid); - lpp = PageGetItemId(dp, lineoff); - - tuple->t_datamcxt = NULL; - tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - tuple->t_len = ItemIdGetLength(lpp); - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - - return; - } - else if (dir < 0) - { - /* - * reverse scan direction - */ - if (tid == NULL) - { - page = pages - 1; /* final page */ - } - else - { - page = ItemPointerGetBlockNumber(tid); /* current page */ - } - - Assert(page < pages); - - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); - - LockBuffer(*buffer, BUFFER_LOCK_SHARE); - - dp = (Page) BufferGetPage(*buffer); - lines = PageGetMaxOffsetNumber(dp); - if (tid == NULL) - { - lineoff = lines; /* final offnum */ - } - else - { - lineoff = /* previous offnum */ - OffsetNumberPrev(ItemPointerGetOffsetNumber(tid)); - } - /* page and lineoff now reference the physically previous tid */ - } + if (!ItemPointerIsValid(tid)) + { + Assert(!PointerIsValid(tid)); + tid = NULL; + } + + tuple->t_tableOid = relation->rd_id; + + /* + * return null immediately if relation is empty + */ + if (pages == 0) + { + if (BufferIsValid(*buffer)) + ReleaseBuffer(*buffer); + *buffer = InvalidBuffer; + tuple->t_datamcxt = NULL; + tuple->t_data = NULL; + return; + } + + /* + * calculate next starting lineoff, given scan direction + */ + if (dir == 0) + { + /*elog(NOTICE,"NoMovement Scan started\n");*/ + /* + * ``no movement'' scan direction: refetch same tuple + */ + if (tid == NULL) + { + if (BufferIsValid(*buffer)) + ReleaseBuffer(*buffer); + *buffer = InvalidBuffer; + tuple->t_datamcxt = NULL; + tuple->t_data = NULL; + return; + } + + *buffer = ReleaseAndReadBuffer(*buffer, + relation, + ItemPointerGetBlockNumber(tid)); + + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); + lineoff = ItemPointerGetOffsetNumber(tid); + lpp = PageGetItemId(dp, lineoff); + + tuple->t_datamcxt = NULL; + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + return; + } + else if (dir < 0) + { + /* + * reverse scan direction + */ + if (tid == NULL) + { + /*page = pages-1;*/ /* final page */ + page = scan->rs_start_page; + } + else + { + page = ItemPointerGetBlockNumber(tid); /* current page */ + } + + Assert(page < pages); + + *buffer = ReleaseAndReadBuffer(*buffer, + relation, + page); + + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); + lines = PageGetMaxOffsetNumber(dp); + if (tid == NULL) + { + lineoff = lines; /* final offnum */ + } + else + { + lineoff = /* previous offnum */ + OffsetNumberPrev(ItemPointerGetOffsetNumber(tid)); + } + /* page and lineoff now reference the physically previous tid */ + } + else + { + /* + * forward scan direction + */ + if (tid == NULL) + { + page = scan->rs_start_page; /* first page */ + lineoff = FirstOffsetNumber; /* first offnum */ + } + else + { + page = ItemPointerGetBlockNumber(tid); /* current page */ + lineoff = /* next offnum */ + OffsetNumberNext(ItemPointerGetOffsetNumber(tid)); + } + + Assert(page < pages); + + *buffer = ReleaseAndReadBuffer(*buffer, + relation, + page); + + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); + lines = PageGetMaxOffsetNumber(dp); + /* page and lineoff now reference the physically next tid */ + } + + /* 'dir' is now non-zero */ + + /* + * calculate line pointer and number of remaining items to check on + * this page. + */ + lpp = PageGetItemId(dp, lineoff); + if (dir < 0) + linesleft = lineoff - 1; + else + linesleft = lines - lineoff; + + /* + * advance the scan until we find a qualifying tuple or run out of + * stuff to scan + */ + for (;;) + { + while (linesleft >= 0) + { + if (ItemIdIsUsed(lpp)) + { + bool valid; + + tuple->t_datamcxt = NULL; + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), page, lineoff); + + /* + * if current tuple qualifies, return it. + */ + HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp, + snapshot, nkeys, key, valid); + if (valid) + { + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + return; + } + } + + /* + * otherwise move to the next item on the page + */ + --linesleft; + if (dir < 0) + { + --lpp; /* move back in this page's ItemId array */ + --lineoff; + } + else + { + ++lpp; /* move forward in this page's ItemId + * array */ + ++lineoff; + } + } + + /* + * if we get here, it means we've exhausted the items on this page + * and it's time to move to the next. + */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + + /* advance page, or wrap around */ + if(dir < 0) { + if(page > 0) + page = page - 1; else - { - /* - * forward scan direction - */ - if (tid == NULL) - { - page = 0; /* first page */ - lineoff = FirstOffsetNumber; /* first offnum */ - } - else - { - page = ItemPointerGetBlockNumber(tid); /* current page */ - lineoff = /* next offnum */ - OffsetNumberNext(ItemPointerGetOffsetNumber(tid)); - } - - Assert(page < pages); - - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); - - LockBuffer(*buffer, BUFFER_LOCK_SHARE); - - dp = (Page) BufferGetPage(*buffer); - lines = PageGetMaxOffsetNumber(dp); - /* page and lineoff now reference the physically next tid */ - } - - /* 'dir' is now non-zero */ - - /* - * calculate line pointer and number of remaining items to check on - * this page. - */ - lpp = PageGetItemId(dp, lineoff); - if (dir < 0) - linesleft = lineoff - 1; + page = pages; + } + else { + if(page < pages-1) + page = page + 1; else - linesleft = lines - lineoff; + page = 0; + } - /* - * advance the scan until we find a qualifying tuple or run out of - * stuff to scan - */ - for (;;) + /* + * return NULL if we've exhausted all the pages + */ + if(page == scan->rs_start_page) + { + if (BufferIsValid(*buffer)) + ReleaseBuffer(*buffer); + *buffer = InvalidBuffer; + tuple->t_datamcxt = NULL; + tuple->t_data = NULL; + return; + } + + Assert(page < pages); + + /*elog(NOTICE,"Page on relid %u advanced to %d", + RelationGetRelid(scan->rs_rd),page);*/ + report_page_loc(RelationGetRelid(scan->rs_rd),page); + *buffer = ReleaseAndReadBuffer(*buffer, + relation, + page); + + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + dp = (Page) BufferGetPage(*buffer); + lines = PageGetMaxOffsetNumber((Page) dp); + linesleft = lines - 1; + if (dir < 0) + { + lineoff = lines; + lpp = PageGetItemId(dp, lines); + } + else { - while (linesleft >= 0) - { - if (ItemIdIsUsed(lpp)) - { - bool valid; - - tuple->t_datamcxt = NULL; - tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), page, lineoff); - - /* - * if current tuple qualifies, return it. - */ - HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp, - snapshot, nkeys, key, valid); - if (valid) - { - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - return; - } - } - - /* - * otherwise move to the next item on the page - */ - --linesleft; - if (dir < 0) - { - --lpp; /* move back in this page's ItemId array */ - --lineoff; - } - else - { - ++lpp; /* move forward in this page's ItemId - * array */ - ++lineoff; - } - } - - /* - * if we get here, it means we've exhausted the items on this page - * and it's time to move to the next. - */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - - /* - * return NULL if we've exhausted all the pages - */ - if ((dir < 0) ? (page == 0) : (page + 1 >= pages)) - { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; - tuple->t_datamcxt = NULL; - tuple->t_data = NULL; - return; - } - - page = (dir < 0) ? (page - 1) : (page + 1); - - Assert(page < pages); - - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); - - LockBuffer(*buffer, BUFFER_LOCK_SHARE); - dp = (Page) BufferGetPage(*buffer); - lines = PageGetMaxOffsetNumber((Page) dp); - linesleft = lines - 1; - if (dir < 0) - { - lineoff = lines; - lpp = PageGetItemId(dp, lines); - } - else - { - lineoff = FirstOffsetNumber; - lpp = PageGetItemId(dp, FirstOffsetNumber); - } + lineoff = FirstOffsetNumber; + lpp = PageGetItemId(dp, FirstOffsetNumber); } + } } @@ -829,6 +904,7 @@ /* * Note: we depend here on the -1/0/1 encoding of ScanDirection. */ + heapgettup(scan->rs_rd, (int) direction, &(scan->rs_ctup), @@ -836,7 +912,7 @@ scan->rs_snapshot, scan->rs_nkeys, scan->rs_key, - scan->rs_nblocks); + scan->rs_nblocks,scan); if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf)) { @@ -1989,7 +2065,7 @@ scan->rs_snapshot, 0, NULL, - scan->rs_nblocks); + scan->rs_nblocks,scan); } }
--- postgresql-8.0.0/src/include/access/relscan.h 2004-12-31 14:03:21.000000000 -0800 +++ postgresql-8.0.0-ssf/src/include/access/relscan.h 2005-02-22 17:57:27.343559992 -0800 @@ -34,6 +34,7 @@ ItemPointerData rs_mctid; /* marked scan position, if any */ PgStat_Info rs_pgstat_info; /* statistics collector hook */ + BlockNumber rs_start_page; /* page where scan started */ } HeapScanDescData; typedef HeapScanDescData *HeapScanDesc;
---------------------------(end of broadcast)--------------------------- TIP 5: Have you checked our extensive FAQ? http://www.postgresql.org/docs/faq