Christoph Berg <christoph.b...@credativ.de> writes:
> Warming up an old thread here - we ran into the same problem.

Thanks for the example proving it is repeatable.

I dug into this a bit and found the problem.  When a page of pg_notify
is filled, asyncQueueAddEntries() advances the global QUEUE_HEAD to
point to the next page before it calls SimpleLruZeroPage() to make that
page exist in slru.c.  But SimpleLruZeroPage could fail --- even though
it doesn't attempt to write out the new page, it might need to dump
some existing dirty page from shared memory before it can allocate a
slot to the new page.  If it does fail, then QUEUE_HEAD is pointing
at a page number that doesn't exist either on disk or in slru.c buffers.
Subsequent attempts to execute asyncQueueAddEntries() will fail because
SimpleLruReadPage() says "page? what page?".  And there is noplace else
in async.c that would try to create the missing page.  This error will
persist until the server is restarted (thus resetting the state of
pg_notify), even if the underlying disk-full condition is cleared.

The solution is to make sure we don't advance QUEUE_HEAD into the new
page until SimpleLruZeroPage has created it.  There are a couple of ways
the code could be fixed, but what I chose to do in the attached proposed
patch is to make asyncQueueAddEntries work with a local queue_head
pointer, which is only copied back to shared memory on successful exit.

Comments, objections?

                        regards, tom lane

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index fcb087ed15dfbb4d567ee0c742e1db8ec1353a2d..6fcd9093268e7bc5aaf6102fa9d6adcdcced1d6d 100644
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
*************** static ListCell *
*** 1285,1290 ****
--- 1285,1291 ----
  asyncQueueAddEntries(ListCell *nextNotify)
  {
  	AsyncQueueEntry qe;
+ 	QueuePosition queue_head;
  	int			pageno;
  	int			offset;
  	int			slotno;
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1292,1299 ****
  	/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
  	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
  
  	/* Fetch the current page */
! 	pageno = QUEUE_POS_PAGE(QUEUE_HEAD);
  	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
  	/* Note we mark the page dirty before writing in it */
  	AsyncCtl->shared->page_dirty[slotno] = true;
--- 1293,1313 ----
  	/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
  	LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
  
+ 	/*
+ 	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
+ 	 * memory upon exiting.  The reason for this is that if we have to advance
+ 	 * to a new page, SimpleLruZeroPage might fail (out of disk space, for
+ 	 * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
+ 	 * subsequent insertions would try to put entries into a page that slru.c
+ 	 * thinks doesn't exist yet.)  So, use a local position variable.  Note
+ 	 * that if we do fail, any already-inserted queue entries are forgotten;
+ 	 * this is okay, since they'd be useless anyway after our transaction
+ 	 * rolls back.
+ 	 */
+ 	queue_head = QUEUE_HEAD;
+ 
  	/* Fetch the current page */
! 	pageno = QUEUE_POS_PAGE(queue_head);
  	slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
  	/* Note we mark the page dirty before writing in it */
  	AsyncCtl->shared->page_dirty[slotno] = true;
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1305,1311 ****
  		/* Construct a valid queue entry in local variable qe */
  		asyncQueueNotificationToEntry(n, &qe);
  
! 		offset = QUEUE_POS_OFFSET(QUEUE_HEAD);
  
  		/* Check whether the entry really fits on the current page */
  		if (offset + qe.length <= QUEUE_PAGESIZE)
--- 1319,1325 ----
  		/* Construct a valid queue entry in local variable qe */
  		asyncQueueNotificationToEntry(n, &qe);
  
! 		offset = QUEUE_POS_OFFSET(queue_head);
  
  		/* Check whether the entry really fits on the current page */
  		if (offset + qe.length <= QUEUE_PAGESIZE)
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1331,1338 ****
  			   &qe,
  			   qe.length);
  
! 		/* Advance QUEUE_HEAD appropriately, and note if page is full */
! 		if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length))
  		{
  			/*
  			 * Page is full, so we're done here, but first fill the next page
--- 1345,1352 ----
  			   &qe,
  			   qe.length);
  
! 		/* Advance queue_head appropriately, and detect if page is full */
! 		if (asyncQueueAdvance(&(queue_head), qe.length))
  		{
  			/*
  			 * Page is full, so we're done here, but first fill the next page
*************** asyncQueueAddEntries(ListCell *nextNotif
*** 1342,1353 ****
  			 * asyncQueueIsFull() ensured that there is room to create this
  			 * page without overrunning the queue.
  			 */
! 			slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
  			/* And exit the loop */
  			break;
  		}
  	}
  
  	LWLockRelease(AsyncCtlLock);
  
  	return nextNotify;
--- 1356,1370 ----
  			 * asyncQueueIsFull() ensured that there is room to create this
  			 * page without overrunning the queue.
  			 */
! 			slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
  			/* And exit the loop */
  			break;
  		}
  	}
  
+ 	/* Success, so update the global QUEUE_HEAD */
+ 	QUEUE_HEAD = queue_head;
+ 
  	LWLockRelease(AsyncCtlLock);
  
  	return nextNotify;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to