I have prepared a patch to backends/commands/async,c to speed up
duplicate elimination. rdtsc timing results are sent back via ereport.
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***
*** 326,337 typedef struct Notification
--- 326,353
{
char *channel; /* channel name */
char *payload; /* payload string (can be empty) */
+ uint32 hash ; /* speed up search for duplicates */
+ struct Notification *left ;
+ struct Notification *right ;
+
} Notification;
static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
+ static Notification *treerootNotifies = NULL ; /* speed up search for duplicates */
+ #warning rdtsc just included for lack of a suitable reference
+ static uint64 rdtsc_total ;
+ static int rdtsc_count ;
+ static inline uint64 rdtsc(void)
+ {
+ uint64 x;
+ __asm__ volatile (.byte 0x0f, 0x31 : =A (x));
+ return x;
+ }
+
+
/*
* State for inbound notifications consists of two flags: one saying whether
* the signal handler is currently allowed to call ProcessIncomingNotify
***
*** 382,389 static void ProcessIncomingNotify(void);
static void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
- static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
static void ClearPendingActionsAndNotifies(void);
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
--- 398,408
static void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
static void ClearPendingActionsAndNotifies(void);
+ static Notification **AsyncSearchPendingNotifies(const char *channel,
+ const char *payload,
+ uint32 *hash);
+ /* Does pendingNotifies include the given channel/payload? */
/*
* We will work on the page range of 0..QUEUE_MAX_PAGE.
***
*** 533,538 Async_Notify(const char *channel, const char *payload)
--- 552,560
{
Notification *n;
MemoryContext oldcontext;
+ Notification **nn ;
+ uint64 t1,t2 ;
+ uint32 hash ;
if (Trace_notify)
elog(DEBUG1, Async_Notify(%s), channel);
***
*** 557,564 Async_Notify(const char *channel, const char *payload)
}
/* no point in making duplicate entries in the list ... */
! if (AsyncExistsPendingNotify(channel, payload))
! return;
/*
* The notification list needs to live until end of transaction, so store
--- 579,592
}
/* no point in making duplicate entries in the list ... */
! t1 = rdtsc() ;
! nn = AsyncSearchPendingNotifies(channel,payload,hash) ;
! t2 = rdtsc();
! rdtsc_total += t2-t1 ;
! rdtsc_count += 1 ;
! if ( !nn ) /* this was a duplicate entry */
! return ;
!
/*
* The notification list needs to live until end of transaction, so store
***
*** 566,584 Async_Notify(const char *channel, const char *payload)
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! n = (Notification *) palloc(sizeof(Notification));
n-channel = pstrdup(channel);
if (payload)
n-payload = pstrdup(payload);
else
n-payload = ;
! /*
! * We want to preserve the order so we need to append every notification.
! * See comments at AsyncExistsPendingNotify().
*/
pendingNotifies = lappend(pendingNotifies, n);
-
MemoryContextSwitchTo(oldcontext);
}
--- 594,625
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
! n = (Notification *) palloc(sizeof(*n));
n-channel = pstrdup(channel);
if (payload)
n-payload = pstrdup(payload);
else
n-payload = ;
+
+ /* append to search tree */
+ n-left = NULL ;
+ n-right = NULL ;
+ n-hash = hash ;
+ *nn = n ;
! /* We want to preserve the order so we need to append every notification.
! * As we are not checking our parents' lists, we can still get duplicates
! * in combination with subtransactions, like in:
! *
! * begin;
! * notify foo '1';
! * savepoint foo;
! * notify foo '1';
! * commit;
! *--
*/
+
pendingNotifies = lappend(pendingNotifies, n);
MemoryContextSwitchTo(oldcontext);
}
***
*** 2149,2156 NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
elog(INFO, NOTIFY for \%s\ payload \%s\, channel, payload);
}
! /* Does pendingNotifies include the given channel/payload? */
! static bool
AsyncExistsPendingNotify(const char *channel, const char *payload)
{
ListCell *p;
--- 2190,2314
elog(INFO, NOTIFY for \%s\ payload \%s\, channel, payload);
}
!
! static inline uint32 murmurhash2( const char* key, uint32 seed )
! #warning yet another copy of murmurhash2
! #warning should we use murmurhash3 or one of the hash functions from