I have prepared a patch to backends/commands/async,c to speed up
duplicate elimination. rdtsc timing results are sent back via ereport.



*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 326,337 **** typedef struct Notification
--- 326,353 ----
  {
  	char	   *channel;		/* channel name */
  	char	   *payload;		/* payload string (can be empty) */
+ 	uint32      hash ;			/* speed up search for duplicates */
+     struct Notification *left  ;
+     struct Notification *right ;
+ 
  } Notification;
  
  static List *pendingNotifies = NIL;		/* list of Notifications */
  
  static List *upperPendingNotifies = NIL;		/* list of upper-xact lists */
  
+ static Notification *treerootNotifies = NULL ; /* speed up search for duplicates */
+ #warning rdtsc just included for lack of a suitable reference
+ static uint64   rdtsc_total ;
+ static int      rdtsc_count ;
+ static inline uint64 rdtsc(void)
+ {
+ 	uint64  x;
+ 	__asm__ volatile (".byte 0x0f, 0x31" : "=A" (x));
+ 	return x;
+ }
+ 
+ 
  /*
   * State for inbound notifications consists of two flags: one saying whether
   * the signal handler is currently allowed to call ProcessIncomingNotify
***************
*** 382,389 **** static void ProcessIncomingNotify(void);
  static void NotifyMyFrontEnd(const char *channel,
  				 const char *payload,
  				 int32 srcPid);
- static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
  static void ClearPendingActionsAndNotifies(void);
  
  /*
   * We will work on the page range of 0..QUEUE_MAX_PAGE.
--- 398,408 ----
  static void NotifyMyFrontEnd(const char *channel,
  				 const char *payload,
  				 int32 srcPid);
  static void ClearPendingActionsAndNotifies(void);
+ static Notification **AsyncSearchPendingNotifies(const char *channel,
+                 const char *payload,
+                 uint32 *hash);
+ /* Does pendingNotifies include the given channel/payload? */
  
  /*
   * We will work on the page range of 0..QUEUE_MAX_PAGE.
***************
*** 533,538 **** Async_Notify(const char *channel, const char *payload)
--- 552,560 ----
  {
  	Notification *n;
  	MemoryContext oldcontext;
+ 	Notification **nn ; 
+ 	uint64 t1,t2 ; 
+ 	uint32 hash ; 
  
  	if (Trace_notify)
  		elog(DEBUG1, "Async_Notify(%s)", channel);
***************
*** 557,564 **** Async_Notify(const char *channel, const char *payload)
  	}
  
  	/* no point in making duplicate entries in the list ... */
! 	if (AsyncExistsPendingNotify(channel, payload))
! 		return;
  
  	/*
  	 * The notification list needs to live until end of transaction, so store
--- 579,592 ----
  	}
  
  	/* no point in making duplicate entries in the list ... */
! 	t1 = rdtsc() ;
!     nn = AsyncSearchPendingNotifies(channel,payload,&hash) ;
!     t2 = rdtsc();
!     rdtsc_total += t2-t1 ;
!     rdtsc_count += 1 ;
!     if ( !nn ) /* this was a duplicate entry */
!         return ;
! 
  
  	/*
  	 * The notification list needs to live until end of transaction, so store
***************
*** 566,584 **** Async_Notify(const char *channel, const char *payload)
  	 */
  	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
! 	n = (Notification *) palloc(sizeof(Notification));
  	n->channel = pstrdup(channel);
  	if (payload)
  		n->payload = pstrdup(payload);
  	else
  		n->payload = "";
  
! 	/*
! 	 * We want to preserve the order so we need to append every notification.
! 	 * See comments at AsyncExistsPendingNotify().
  	 */
  	pendingNotifies = lappend(pendingNotifies, n);
- 
  	MemoryContextSwitchTo(oldcontext);
  }
  
--- 594,625 ----
  	 */
  	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
! 	n = (Notification *) palloc(sizeof(*n));
  	n->channel = pstrdup(channel);
  	if (payload)
  		n->payload = pstrdup(payload);
  	else
  		n->payload = "";
+  
+ 	/* append to search tree */
+ 	n->left  = NULL ; 
+ 	n->right = NULL ; 
+ 	n->hash  = hash ; 
+ 	*nn = n ;
  
! 	/* We want to preserve the order so we need to append every notification. 
! 	 * As we are not checking our parents' lists, we can still get duplicates
! 	 * in combination with subtransactions, like in:
! 	 *
! 	 * begin;
! 	 * notify foo '1';
! 	 * savepoint foo;
! 	 * notify foo '1';
! 	 * commit;
! 	 *----------
  	 */
+ 
  	pendingNotifies = lappend(pendingNotifies, n);
  	MemoryContextSwitchTo(oldcontext);
  }
  
***************
*** 2149,2156 **** NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
  		elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
  }
  
! /* Does pendingNotifies include the given channel/payload? */
! static bool
  AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
--- 2190,2314 ----
  		elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
  }
  
! 
! static inline uint32 murmurhash2( const char* key, uint32 seed )
! #warning yet another copy of murmurhash2
! #warning should we use murmurhash3 or one of the hash functions from backend/utils/hash/hashfn.c?
! {
!     const uint32  m = 0x5bd1e995;
!     const int r = 24;
!     int len = strlen(key) ;
!     uint32 h = seed ^ len;
!     uint32 k ;
!     const unsigned char * data = (const unsigned char *)key;
!     while(len >= 4)
!     {
!         memcpy(&k,data,4) ;
! 
!         k *= m;
!         k ^= k >> r;
!         k *= m;
! 
!         h *= m;
!         h ^= k;
! 
!         data += 4;
!         len -= 4;
!     }
! 
!     /* Handle the last few bytes of the input array */
! 
!     switch(len)
!     {
!     case 3: h ^= data[2] << 16;
!     case 2: h ^= data[1] << 8;
!     case 1: h ^= data[0];
!             h *= m;
!     };
! 
!     /* Do a few final mixes of the hash to ensure the last few */
!     /* bytes are well-incorporated. */
! 
!     h ^= h >> 13;
!     h *= m;
!     h ^= h >> 15;
! 
!     return h;
! }
! 
! static Notification **
! AsyncSearchPendingNotifies ( const char *channel, const char *payload , uint32 *phash )
! /* return NULL if (channel,payload) is a duplicate entry 
!  * otherwise, return an insert position and set phash.
!  * a NULL payload is treated as an empty string.
!  * the tree search is never worse than a linear list and the 
!  * mixing properties of the hash function will keep the average depth low 
!  */
! {
!     Notification   *t ;
!     Notification  **p ;
!     uint32 hash ; 
!     int depth = 0 ; 
!     
!     if ( !payload ) 
!         payload = "" ; 
! 
! 	/*----------
! 	 * We need to append new elements to the end of the list in order to keep
! 	 * the order. 
! 	 * As we are not checking our parents' lists, we can still get duplicates
! 	 * in combination with subtransactions, like in:
! 	 *
! 	 * begin;
! 	 * notify foo '1';
! 	 * savepoint foo;
! 	 * notify foo '1';
! 	 * commit;
! 	 *----------
! 	 */
! 	/* avoid warning about possibly uninitialized value */
! 	*phash = 0 ;
! 	if ( pendingNotifies ) 
! 	{
! 		Notification *n = (Notification *) llast(pendingNotifies);
! 		if (strcmp(n->channel, channel) == 0 &&
! 			strcmp(n->payload, payload) == 0)
! 		return NULL ; 
! 	}
! 
!     hash = murmurhash2(channel,691) ; 
!     hash = murmurhash2(payload,hash) ; /* sadly, not incremental */
!     *phash = hash ; 
! 
! 	t =  treerootNotifies ; 
! 	p = &treerootNotifies ; 
!     while ( t ) 
!     {
!         ++depth ; 
!         if ( hash < t->hash ) 
!         {
!             p = &t->left ; 
!             t = t->left ;
!         }
!         else if ( hash > t->hash ) 
!         {
!             p = &t->right ; 
!             t = t->right ; 
!         }
!         else 
!         {
!             if ( 0 == strcmp(t->channel,channel) && 0 == strcmp(t->payload,payload) ) 
!                 return NULL ;
!             p = &t->left ;
!             t = t->left ;
!         }
!     }
!     return p ;
! }
! 
! 
! #if 0
! bool 
  AsyncExistsPendingNotify(const char *channel, const char *payload)
  {
  	ListCell   *p;
***************
*** 2196,2201 **** AsyncExistsPendingNotify(const char *channel, const char *payload)
--- 2354,2360 ----
  
  	return false;
  }
+ #endif
  
  /* Clear the pendingActions and pendingNotifies lists. */
  static void
***************
*** 2210,2213 **** ClearPendingActionsAndNotifies(void)
--- 2369,2380 ----
  	 */
  	pendingActions = NIL;
  	pendingNotifies = NIL;
+ 
+ 	if ( rdtsc_count ) 
+ 		ereport(NOTICE,(errmsg("AsyncSearchPendingNotifies: rtdsc timing: %.2f (%d)", (double)rdtsc_total/rdtsc_count, rdtsc_count))) ; 
+ 
+ 	treerootNotifies = NULL ; 
+ 	rdtsc_total = 0 ; 
+ 	rdtsc_count = 0 ;
  }
+ 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to