I had an idea that might improve parallel seqscans on the same relation.

If you have lots of concurrent seqscans going on a large relation, the
cache hit ratio is very low. But, if the seqscans are concurrent on the
same relation, there may be something to gain by starting a seqscan near
the page being accessed by an already-in-progress seqscan, and wrapping
back around to that start location. That would make some use of the
shared buffers, which would otherwise just be cache pollution.

I made a proof-of-concept implementation, which is entirely in heapam.c,
except for one addition to the HeapScanDesc struct in relscan.h. It is
not at all up to production quality; there are things I know that need
to be addressed. Basically, I just modified heapam.c to be able to start
at any page in the relation. Then, every time it reads a new page, I
have it mark the relation's oid and the page number in a shared mem
segment. Everytime a new scan is started, it reads the shared mem
segment, and if the relation's oid matches, it starts the scan at the
page number it found in the shared memory. Otherwise, it starts the scan
at 0.

There are a couple obvious issues, one is that my whole implementation
doesn't account for reverse scans at all (since initscan doesn't know
what direction the scan will move in), but that shouldn't be a major
problem since at worst it will be the current behavior (aside: can
someone tell me how to force reverse scans so I can test that better?).
Another is that there's a race condition with the shared mem, and that's
out of pure laziness on my part.

This method is really only effective at all if there is a significant
amount of disk i/o. If it's pulling the data from O/S buffers the
various scans will diverge too much and not be using eachother's shared
buffers.

I tested with shared_buffers=500 and all stats on. I used 60 threads
performing 30 seqscans each in my script ssf.rb (I refer to my
modification as "sequential scan follower" or ssf). 

Here are some results with my modifications:
$ time ./ssf.rb # my script

real    4m22.476s
user    0m0.389s
sys     0m0.186s

test=# select relpages from pg_class where relname='test_ssf';
 relpages
----------
     1667
(1 row)

test=# select count(*) from test_ssf;
 count
--------
 200000
(1 row)

test=# select pg_stat_get_blocks_hit(17232) as hit,
pg_stat_get_blocks_fetched(17232) as total;
  hit   |  total
--------+---------
 971503 | 3353963
(1 row)

Or, approx. 29% cache hit.

Here are the results without my modifications:

test=# select relpages from pg_class where relname='test_ssf';
 relpages
----------
     1667
(1 row)

test=# select count(*) from test_ssf;
 count
--------
 200000
(1 row)

test=# select pg_stat_get_blocks_hit(17231) as hit,
pg_stat_get_blocks_fetched(17231) as total;
  hit   |  total
--------+---------
 199999 | 3353963
(1 row)

Or, approx. 6% cache hit. Note: the oid is different, because I have two
seperately initdb'd data directories, one for the modified version, one
for the unmodified 8.0.0.

This is the first time I've really modified the PG source code to do
anything that looked promising, so this is more of a question than
anything else. Is it promising? Is this a potentially good approach? I'm
happy to post more test data and more documentation, and I'd also be
happy to bring the code to production quality. However, before I spend
too much more time on that, I'd like to get a general response from a
3rd party to let me know if I'm off base.

Regards,
        Jeff Davis

--- postgresql-8.0.0/src/backend/access/heap/heapam.c	2004-12-31 13:59:16.000000000 -0800
+++ postgresql-8.0.0-ssf/src/backend/access/heap/heapam.c	2005-02-24 20:37:24.596626668 -0800
@@ -65,6 +65,50 @@
  * ----------------------------------------------------------------
  */
 
+
+#include <sys/shm.h>
+#include <sys/types.h>
+#include <sys/ipc.h>
+/*
+ Retrieves a location of an in-progress seqscan on relid
+ and returns the page location
+*/
+static BlockNumber get_page_loc(Oid relid) {
+  int shmid;
+  char *shm;
+  Oid shmrelid;
+  BlockNumber loc;
+  shmid = shmget(0x11aa55cc,(sizeof(Oid)+sizeof(BlockNumber)),
+		 0666|IPC_CREAT);
+  shm = shmat(shmid,NULL,0);
+  shmrelid = *((Oid*)shm);
+  loc = *((BlockNumber*)(shm+sizeof(Oid)));
+  shmdt(shm);
+  /*elog(NOTICE,"Getting shm: %u %u",shmrelid,loc);*/
+  if (shmrelid == relid)
+    return loc;
+  else
+    return 0;
+}
+
+/*
+ Places (relid,loc) in a shared mem structure
+*/
+static int report_page_loc(Oid relid, BlockNumber loc) 
+{
+  int shmid;
+  char *shm;
+  shmid = shmget(0x11aa55cc,(sizeof(Oid)+sizeof(BlockNumber)),
+		 0666|IPC_CREAT);
+  shm = shmat(shmid,NULL,0);
+
+  /*elog(NOTICE,"Setting shm: %u %u",relid,loc);*/
+  *((Oid*)shm) = relid;
+  *((BlockNumber*)(shm+sizeof(Oid))) = loc;
+  shmdt(shm);
+  return 0;
+}
+
 /* ----------------
  *		initscan - scan code common to heap_beginscan and heap_rescan
  * ----------------
@@ -85,6 +129,18 @@
 	scan->rs_ctup.t_data = NULL;
 	scan->rs_cbuf = InvalidBuffer;
 
+	/* initialize start page */
+	scan->rs_start_page = get_page_loc(RelationGetRelid(scan->rs_rd));
+	  /*
+	    if(scan->rs_nblocks)
+	  scan->rs_start_page = 5 % scan->rs_nblocks;
+	else
+	  scan->rs_start_page = 0;
+	  */
+	/*elog(NOTICE,"Scan started on relid: %u, pages: %d, page: %d",
+	     RelationGetRelid(scan->rs_rd),
+	     scan->rs_nblocks,
+	     scan->rs_start_page);*/
 	/* we don't have a marked position... */
 	ItemPointerSetInvalid(&(scan->rs_mctid));
 
@@ -115,261 +171,280 @@
 		   Snapshot snapshot,
 		   int nkeys,
 		   ScanKey key,
-		   BlockNumber pages)
+		   BlockNumber pages,
+	   HeapScanDesc scan)
 {
-	ItemId		lpp;
-	Page		dp;
-	BlockNumber page;
-	int			lines;
-	OffsetNumber lineoff;
-	int			linesleft;
-	ItemPointer tid;
-
-	tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
-
-	/*
-	 * debugging stuff
-	 *
-	 * check validity of arguments, here and for other functions too Note: no
-	 * locking manipulations needed--this is a local function
-	 */
+  ItemId		lpp;
+  Page		dp;
+  BlockNumber page;
+  int			lines;
+  OffsetNumber lineoff;
+  int			linesleft;
+  ItemPointer tid;
+
+  tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self);
+
+  /*
+   * debugging stuff
+   *
+   * check validity of arguments, here and for other functions too Note: no
+   * locking manipulations needed--this is a local function
+   */
 #ifdef	HEAPDEBUGALL
-	if (ItemPointerIsValid(tid))
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, tid->ip_blkid,
-			 tid->ip_posid, dir);
-	else
-		elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
-			 RelationGetRelationName(relation), tid, dir);
-
-	elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
-
-	elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
-		 relation->rd_rel->relkind, RelationGetRelationName(relation),
-		 snapshot);
+  if (ItemPointerIsValid(tid))
+    elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
+	 RelationGetRelationName(relation), tid, tid->ip_blkid,
+	 tid->ip_posid, dir);
+  else
+    elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
+	 RelationGetRelationName(relation), tid, dir);
+
+  elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
+
+  elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p",
+       relation->rd_rel->relkind, RelationGetRelationName(relation),
+       snapshot);
 #endif   /* HEAPDEBUGALL */
 
-	if (!ItemPointerIsValid(tid))
-	{
-		Assert(!PointerIsValid(tid));
-		tid = NULL;
-	}
-
-	tuple->t_tableOid = relation->rd_id;
-
-	/*
-	 * return null immediately if relation is empty
-	 */
-	if (pages == 0)
-	{
-		if (BufferIsValid(*buffer))
-			ReleaseBuffer(*buffer);
-		*buffer = InvalidBuffer;
-		tuple->t_datamcxt = NULL;
-		tuple->t_data = NULL;
-		return;
-	}
-
-	/*
-	 * calculate next starting lineoff, given scan direction
-	 */
-	if (dir == 0)
-	{
-		/*
-		 * ``no movement'' scan direction: refetch same tuple
-		 */
-		if (tid == NULL)
-		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
-			tuple->t_datamcxt = NULL;
-			tuple->t_data = NULL;
-			return;
-		}
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   ItemPointerGetBlockNumber(tid));
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lineoff = ItemPointerGetOffsetNumber(tid);
-		lpp = PageGetItemId(dp, lineoff);
-
-		tuple->t_datamcxt = NULL;
-		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
-		tuple->t_len = ItemIdGetLength(lpp);
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
-		return;
-	}
-	else if (dir < 0)
-	{
-		/*
-		 * reverse scan direction
-		 */
-		if (tid == NULL)
-		{
-			page = pages - 1;	/* final page */
-		}
-		else
-		{
-			page = ItemPointerGetBlockNumber(tid);		/* current page */
-		}
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber(dp);
-		if (tid == NULL)
-		{
-			lineoff = lines;	/* final offnum */
-		}
-		else
-		{
-			lineoff =			/* previous offnum */
-				OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
-		}
-		/* page and lineoff now reference the physically previous tid */
-	}
+  if (!ItemPointerIsValid(tid))
+    {
+      Assert(!PointerIsValid(tid));
+      tid = NULL;
+    }
+
+  tuple->t_tableOid = relation->rd_id;
+
+  /*
+   * return null immediately if relation is empty
+   */
+  if (pages == 0)
+    {
+      if (BufferIsValid(*buffer))
+	ReleaseBuffer(*buffer);
+      *buffer = InvalidBuffer;
+      tuple->t_datamcxt = NULL;
+      tuple->t_data = NULL;
+      return;
+    }
+
+  /*
+   * calculate next starting lineoff, given scan direction
+   */
+  if (dir == 0)
+    {
+      /*elog(NOTICE,"NoMovement Scan started\n");*/
+      /*
+       * ``no movement'' scan direction: refetch same tuple
+       */
+      if (tid == NULL)
+	{
+	  if (BufferIsValid(*buffer))
+	    ReleaseBuffer(*buffer);
+	  *buffer = InvalidBuffer;
+	  tuple->t_datamcxt = NULL;
+	  tuple->t_data = NULL;
+	  return;
+	}
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     ItemPointerGetBlockNumber(tid));
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lineoff = ItemPointerGetOffsetNumber(tid);
+      lpp = PageGetItemId(dp, lineoff);
+
+      tuple->t_datamcxt = NULL;
+      tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+      tuple->t_len = ItemIdGetLength(lpp);
+      LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+      return;
+    }
+  else if (dir < 0)
+    {
+      /*
+       * reverse scan direction
+       */
+      if (tid == NULL)
+	{
+	  /*page = pages-1;*/	/* final page */
+	  page = scan->rs_start_page;
+	}
+      else
+	{
+	  page = ItemPointerGetBlockNumber(tid);  /* current page */
+	}
+
+      Assert(page < pages);
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber(dp);
+      if (tid == NULL)
+	{
+	  lineoff = lines;	/* final offnum */
+	}
+      else
+	{
+	  lineoff =			/* previous offnum */
+	    OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
+	}
+      /* page and lineoff now reference the physically previous tid */
+    }
+  else
+    {
+      /*
+       * forward scan direction
+       */
+      if (tid == NULL)
+	{
+	  page = scan->rs_start_page;     /* first page */
+	  lineoff = FirstOffsetNumber;		/* first offnum */
+	}
+      else
+	{
+	  page = ItemPointerGetBlockNumber(tid);	  /* current page */
+	  lineoff =			/* next offnum */
+	    OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
+	}
+
+      Assert(page < pages);
+
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber(dp);
+      /* page and lineoff now reference the physically next tid */
+    }
+
+  /* 'dir' is now non-zero */
+
+  /*
+   * calculate line pointer and number of remaining items to check on
+   * this page.
+   */
+  lpp = PageGetItemId(dp, lineoff);
+  if (dir < 0)
+    linesleft = lineoff - 1;
+  else
+    linesleft = lines - lineoff;
+
+  /*
+   * advance the scan until we find a qualifying tuple or run out of
+   * stuff to scan
+   */
+  for (;;)
+    {
+      while (linesleft >= 0)
+	{
+	  if (ItemIdIsUsed(lpp))
+	    {
+	      bool		valid;
+
+	      tuple->t_datamcxt = NULL;
+	      tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
+	      tuple->t_len = ItemIdGetLength(lpp);
+	      ItemPointerSet(&(tuple->t_self), page, lineoff);
+
+	      /*
+	       * if current tuple qualifies, return it.
+	       */
+	      HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
+				 snapshot, nkeys, key, valid);
+	      if (valid)
+		{
+		  LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+		  return;
+		}
+	    }
+
+	  /*
+	   * otherwise move to the next item on the page
+	   */
+	  --linesleft;
+	  if (dir < 0)
+	    {
+	      --lpp;			/* move back in this page's ItemId array */
+	      --lineoff;
+	    }
+	  else
+	    {
+	      ++lpp;			/* move forward in this page's ItemId
+					 * array */
+	      ++lineoff;
+	    }
+	}
+
+      /*
+       * if we get here, it means we've exhausted the items on this page
+       * and it's time to move to the next.
+       */
+      LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+		
+      /* advance page, or wrap around */
+      if(dir < 0) {
+	if(page > 0)
+	  page = page - 1;
 	else
-	{
-		/*
-		 * forward scan direction
-		 */
-		if (tid == NULL)
-		{
-			page = 0;			/* first page */
-			lineoff = FirstOffsetNumber;		/* first offnum */
-		}
-		else
-		{
-			page = ItemPointerGetBlockNumber(tid);		/* current page */
-			lineoff =			/* next offnum */
-				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
-		}
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber(dp);
-		/* page and lineoff now reference the physically next tid */
-	}
-
-	/* 'dir' is now non-zero */
-
-	/*
-	 * calculate line pointer and number of remaining items to check on
-	 * this page.
-	 */
-	lpp = PageGetItemId(dp, lineoff);
-	if (dir < 0)
-		linesleft = lineoff - 1;
+	  page = pages;
+      }
+      else {
+	if(page < pages-1)
+	  page = page + 1;
 	else
-		linesleft = lines - lineoff;
+	  page = 0;
+      }
 
-	/*
-	 * advance the scan until we find a qualifying tuple or run out of
-	 * stuff to scan
-	 */
-	for (;;)
+      /*
+       * return NULL if we've exhausted all the pages
+       */
+      if(page == scan->rs_start_page)
+	{
+	  if (BufferIsValid(*buffer))
+	    ReleaseBuffer(*buffer);
+	  *buffer = InvalidBuffer;
+	  tuple->t_datamcxt = NULL;
+	  tuple->t_data = NULL;
+	  return;
+	}
+
+      Assert(page < pages);
+
+      /*elog(NOTICE,"Page on relid %u advanced to %d",
+	RelationGetRelid(scan->rs_rd),page);*/
+      report_page_loc(RelationGetRelid(scan->rs_rd),page);
+      *buffer = ReleaseAndReadBuffer(*buffer,
+				     relation,
+				     page);
+
+      LockBuffer(*buffer, BUFFER_LOCK_SHARE);
+      dp = (Page) BufferGetPage(*buffer);
+      lines = PageGetMaxOffsetNumber((Page) dp);
+      linesleft = lines - 1;
+      if (dir < 0)
+	{
+	  lineoff = lines;
+	  lpp = PageGetItemId(dp, lines);
+	}
+      else
 	{
-		while (linesleft >= 0)
-		{
-			if (ItemIdIsUsed(lpp))
-			{
-				bool		valid;
-
-				tuple->t_datamcxt = NULL;
-				tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
-				tuple->t_len = ItemIdGetLength(lpp);
-				ItemPointerSet(&(tuple->t_self), page, lineoff);
-
-				/*
-				 * if current tuple qualifies, return it.
-				 */
-				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
-								   snapshot, nkeys, key, valid);
-				if (valid)
-				{
-					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-					return;
-				}
-			}
-
-			/*
-			 * otherwise move to the next item on the page
-			 */
-			--linesleft;
-			if (dir < 0)
-			{
-				--lpp;			/* move back in this page's ItemId array */
-				--lineoff;
-			}
-			else
-			{
-				++lpp;			/* move forward in this page's ItemId
-								 * array */
-				++lineoff;
-			}
-		}
-
-		/*
-		 * if we get here, it means we've exhausted the items on this page
-		 * and it's time to move to the next.
-		 */
-		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
-
-		/*
-		 * return NULL if we've exhausted all the pages
-		 */
-		if ((dir < 0) ? (page == 0) : (page + 1 >= pages))
-		{
-			if (BufferIsValid(*buffer))
-				ReleaseBuffer(*buffer);
-			*buffer = InvalidBuffer;
-			tuple->t_datamcxt = NULL;
-			tuple->t_data = NULL;
-			return;
-		}
-
-		page = (dir < 0) ? (page - 1) : (page + 1);
-
-		Assert(page < pages);
-
-		*buffer = ReleaseAndReadBuffer(*buffer,
-									   relation,
-									   page);
-
-		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
-		dp = (Page) BufferGetPage(*buffer);
-		lines = PageGetMaxOffsetNumber((Page) dp);
-		linesleft = lines - 1;
-		if (dir < 0)
-		{
-			lineoff = lines;
-			lpp = PageGetItemId(dp, lines);
-		}
-		else
-		{
-			lineoff = FirstOffsetNumber;
-			lpp = PageGetItemId(dp, FirstOffsetNumber);
-		}
+	  lineoff = FirstOffsetNumber;
+	  lpp = PageGetItemId(dp, FirstOffsetNumber);
 	}
+    }
 }
 
 
@@ -829,6 +904,7 @@
 	/*
 	 * Note: we depend here on the -1/0/1 encoding of ScanDirection.
 	 */
+
 	heapgettup(scan->rs_rd,
 			   (int) direction,
 			   &(scan->rs_ctup),
@@ -836,7 +912,7 @@
 			   scan->rs_snapshot,
 			   scan->rs_nkeys,
 			   scan->rs_key,
-			   scan->rs_nblocks);
+			   scan->rs_nblocks,scan);
 
 	if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
 	{
@@ -1989,7 +2065,7 @@
 				   scan->rs_snapshot,
 				   0,
 				   NULL,
-				   scan->rs_nblocks);
+				   scan->rs_nblocks,scan);
 	}
 }
 
--- postgresql-8.0.0/src/include/access/relscan.h	2004-12-31 14:03:21.000000000 -0800
+++ postgresql-8.0.0-ssf/src/include/access/relscan.h	2005-02-22 17:57:27.343559992 -0800
@@ -34,6 +34,7 @@
 	ItemPointerData rs_mctid;	/* marked scan position, if any */
 
 	PgStat_Info rs_pgstat_info; /* statistics collector hook */
+  BlockNumber rs_start_page; /* page where scan started */
 } HeapScanDescData;
 
 typedef HeapScanDescData *HeapScanDesc;
---------------------------(end of broadcast)---------------------------
TIP 5: Have you checked our extensive FAQ?

               http://www.postgresql.org/docs/faq

Reply via email to