=?UTF-8?Q?Filip_Rembia=C5=82kowski?= <filip.rembialkow...@gmail.com> writes:
> Still no hash table fallback is implemented, so this is *not* a
> performance improvement. Only a little more flexibility.

I think that we'd probably be better off fixing the root performance issue
than adding semantic complexity to bypass it.  Especially since, if you
don't de-duplicate, that's going to cost you when it comes time to
actually send the notifications, receive them, forward them to clients,
and process them in the clients.

Admittedly, if the app *knows* that it's generating non-duplicate events,
maybe it'd be all right to skip checking that.  But if we can make the
check cheap, I'd just as soon keep it.

Accordingly, I looked into making a hash table when there are more than
a small number of notifications pending, and attached is a lightly-tested
version of that.  This seems to be more or less similar speed to the
existing code for up to 100 or so distinct notifies, but it soon pulls
away above that.

A point that needs discussion is that this patch, unlike the existing
code, *does* de-duplicate fully: any events generated by a subtransaction
that duplicate events already emitted by a parent will get removed when
the subxact is merged to its parent.  I did this partly because we have
to expend O(N) work to merge N subtransaction notifies in any case,
now that we have to make new hashtable entries in the parent xact;
so the old excuse that subxact-end processing is really cheap no
longer applies.  Also because the Assert(!found) assertions in the
attached hash coding fall over if we cheat on this.  If we really want
to maintain exactly the old semantics here, we could relax the hashtable
code to just ignore duplicate entries.  But, per the argument above,
de-duplication is a good thing so I'm inclined to keep it like this.

I also noticed that as things stand, it costs us two or three pallocs to
construct a Notification event.  It wouldn't be terribly hard to reduce
that to one palloc, and maybe it'd be worthwhile if we're thinking that
transactions with many many notifies are a case worth optimizing.
But I didn't do that here; it seems like a separable patch.

I also thought for awhile about not having the hashtable as an auxiliary
data structure, but making it the main data structure.  We could preserve
the required notification ordering information by threading the live
hashtable entries into an slist, say.  However, this would greatly
increase the overhead for transactions with just one or a few distinct
NOTIFY events, since we'd have to set up the hashtable at the first one.
I think that's a common enough case that we shouldn't de-optimize it.
A smaller objection is that such a data structure would absolutely commit
us to de-duplication semantics, whereas the list plus separate hashtable
can cope with not de-duping if someone persuades us that's sane.

Thoughts?

                        regards, tom lane

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e9c580..c21daa5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -135,6 +135,7 @@
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/hashutils.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
 
 /*
  * State for outbound notifies consists of a list of all channels+payloads
- * NOTIFYed in the current transaction. We do not actually perform a NOTIFY
- * until and unless the transaction commits.  pendingNotifies is NIL if no
- * NOTIFYs have been done in the current transaction.
+ * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
+ * until and unless the transaction commits.  pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current (sub) transaction.
+ *
+ * We discard duplicate notify events issued in the same transaction.
+ * Hence, in addition to the list proper (which we need to track the order
+ * of the events, since we guarantee to deliver them in order), we build a
+ * hash table which we can probe to detect duplicates.  Since building the
+ * hash table is somewhat expensive, we do so only once we have at least
+ * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
+ * before that we just scan the events linearly.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
- * successful subtransactions attach their lists to their parent's list.
- * Failed subtransactions simply discard their lists.
+ * successful subtransactions add their entries to their parent's list.
+ * Failed subtransactions simply discard their lists.  Since these lists
+ * are independent, there may be notify events in a subtransaction's list
+ * that duplicate events in some ancestor (sub) transaction; we get rid of
+ * the dups when merging the subtransaction's list into its parent's.
  *
  * Note: the action and notify lists do not interact within a transaction.
  * In particular, if a transaction does NOTIFY and then LISTEN on the same
@@ -343,7 +355,20 @@ typedef struct Notification
 	char	   *payload;		/* payload string (can be empty) */
 } Notification;
 
-static List *pendingNotifies = NIL; /* list of Notifications */
+typedef struct NotificationList
+{
+	List	   *events;			/* list of Notification structs */
+	HTAB	   *hashtab;		/* hash of NotificationHash structs, or NULL */
+} NotificationList;
+
+#define MIN_HASHABLE_NOTIFIES 16	/* threshold to build hashtab */
+
+typedef struct NotificationHash
+{
+	Notification *event;		/* => the actual Notification struct */
+} NotificationHash;
+
+static NotificationList *pendingNotifies = NULL;	/* current list, if any */
 
 static List *upperPendingNotifies = NIL;	/* list of upper-xact lists */
 
@@ -393,6 +418,9 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
 static bool AsyncExistsPendingNotify(const char *channel, const char *payload);
+static void AddEventToPendingNotifies(Notification *n);
+static uint32 notification_hash(const void *key, Size keysize);
+static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
 /*
@@ -586,11 +614,19 @@ Async_Notify(const char *channel, const char *payload)
 	else
 		n->payload = "";
 
-	/*
-	 * We want to preserve the order so we need to append every notification.
-	 * See comments at AsyncExistsPendingNotify().
-	 */
-	pendingNotifies = lappend(pendingNotifies, n);
+	if (pendingNotifies == NULL)
+	{
+		/* First notify event in current (sub)xact */
+		pendingNotifies = (NotificationList *) palloc(sizeof(NotificationList));
+		pendingNotifies->events = list_make1(n);
+		/* We certainly don't need a hashtable yet */
+		pendingNotifies->hashtab = NULL;
+	}
+	else
+	{
+		/* Append more events to existing list */
+		AddEventToPendingNotifies(n);
+	}
 
 	MemoryContextSwitchTo(oldcontext);
 }
@@ -761,7 +797,7 @@ PreCommit_Notify(void)
 {
 	ListCell   *p;
 
-	if (pendingActions == NIL && pendingNotifies == NIL)
+	if (!pendingActions && !pendingNotifies)
 		return;					/* no relevant statements in this xact */
 
 	if (Trace_notify)
@@ -821,7 +857,7 @@ PreCommit_Notify(void)
 		/* Now push the notifications into the queue */
 		backendHasSentNotifications = true;
 
-		nextNotify = list_head(pendingNotifies);
+		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
 		{
 			/*
@@ -1294,7 +1330,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  * database OID in order to fill the page. So every page is always used up to
  * the last byte which simplifies reading the page later.
  *
- * We are passed the list cell (in pendingNotifies) containing the next
+ * We are passed the list cell (in pendingNotifies->events) containing the next
  * notification to write and return the first still-unwritten cell back.
  * Eventually we will return NULL indicating all is done.
  *
@@ -1345,7 +1381,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
 		if (offset + qe.length <= QUEUE_PAGESIZE)
 		{
 			/* OK, so advance nextNotify past this item */
-			nextNotify = lnext(pendingNotifies, nextNotify);
+			nextNotify = lnext(pendingNotifies->events, nextNotify);
 		}
 		else
 		{
@@ -1607,7 +1643,7 @@ AtSubStart_Notify(void)
 	Assert(list_length(upperPendingNotifies) ==
 		   GetCurrentTransactionNestLevel() - 1);
 
-	pendingNotifies = NIL;
+	pendingNotifies = NULL;
 
 	MemoryContextSwitchTo(old_cxt);
 }
@@ -1621,7 +1657,7 @@ void
 AtSubCommit_Notify(void)
 {
 	List	   *parentPendingActions;
-	List	   *parentPendingNotifies;
+	NotificationList *parentPendingNotifies;
 
 	parentPendingActions = linitial_node(List, upperPendingActions);
 	upperPendingActions = list_delete_first(upperPendingActions);
@@ -1634,16 +1670,41 @@ AtSubCommit_Notify(void)
 	 */
 	pendingActions = list_concat(parentPendingActions, pendingActions);
 
-	parentPendingNotifies = linitial_node(List, upperPendingNotifies);
+	parentPendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
 	upperPendingNotifies = list_delete_first(upperPendingNotifies);
 
 	Assert(list_length(upperPendingNotifies) ==
 		   GetCurrentTransactionNestLevel() - 2);
 
-	/*
-	 * We could try to eliminate duplicates here, but it seems not worthwhile.
-	 */
-	pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
+	if (pendingNotifies == NULL)
+	{
+		/* easy, no notify events happened in current subxact */
+		pendingNotifies = parentPendingNotifies;
+	}
+	else if (parentPendingNotifies == NULL)
+	{
+		/* easy, subxact's list becomes parent's */
+	}
+	else
+	{
+		/*
+		 * Formerly, we didn't need to eliminate duplicates here, but now we
+		 * must, else we fall foul of "Assert(!found)", either here or during
+		 * a later attempt to build the parent-level hashtable.
+		 */
+		NotificationList *childPendingNotifies = pendingNotifies;
+		ListCell   *l;
+
+		pendingNotifies = parentPendingNotifies;
+		/* Insert all the subxact's events into parent, except for dups */
+		foreach(l, childPendingNotifies->events)
+		{
+			Notification *childn = (Notification *) lfirst(l);
+
+			if (!AsyncExistsPendingNotify(childn->channel, childn->payload))
+				AddEventToPendingNotifies(childn);
+		}
+	}
 }
 
 /*
@@ -1672,7 +1733,7 @@ AtSubAbort_Notify(void)
 
 	while (list_length(upperPendingNotifies) > my_level - 2)
 	{
-		pendingNotifies = linitial_node(List, upperPendingNotifies);
+		pendingNotifies = (NotificationList *) linitial(upperPendingNotifies);
 		upperPendingNotifies = list_delete_first(upperPendingNotifies);
 	}
 }
@@ -2102,50 +2163,149 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
 static bool
 AsyncExistsPendingNotify(const char *channel, const char *payload)
 {
-	ListCell   *p;
-	Notification *n;
-
-	if (pendingNotifies == NIL)
+	if (pendingNotifies == NULL)
 		return false;
 
 	if (payload == NULL)
 		payload = "";
 
-	/*----------
-	 * We need to append new elements to the end of the list in order to keep
-	 * the order. However, on the other hand we'd like to check the list
-	 * backwards in order to make duplicate-elimination a tad faster when the
-	 * same condition is signaled many times in a row. So as a compromise we
-	 * check the tail element first which we can access directly. If this
-	 * doesn't match, we check the whole list.
-	 *
-	 * As we are not checking our parents' lists, we can still get duplicates
-	 * in combination with subtransactions, like in:
-	 *
-	 * begin;
-	 * notify foo '1';
-	 * savepoint foo;
-	 * notify foo '1';
-	 * commit;
-	 *----------
-	 */
-	n = (Notification *) llast(pendingNotifies);
-	if (strcmp(n->channel, channel) == 0 &&
-		strcmp(n->payload, payload) == 0)
-		return true;
-
-	foreach(p, pendingNotifies)
+	if (pendingNotifies->hashtab != NULL)
 	{
-		n = (Notification *) lfirst(p);
-
-		if (strcmp(n->channel, channel) == 0 &&
-			strcmp(n->payload, payload) == 0)
+		/* Use the hash table to probe for a match */
+		Notification n;
+		Notification *k;
+
+		/* set up a dummy Notification struct */
+		n.channel = unconstify(char *, channel);
+		n.payload = unconstify(char *, payload);
+		k = &n;
+		/* ... and probe */
+		if (hash_search(pendingNotifies->hashtab,
+						&k,
+						HASH_FIND,
+						NULL))
 			return true;
 	}
+	else
+	{
+		/* Must scan the event list */
+		ListCell   *l;
+
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *n = (Notification *) lfirst(l);
+
+			if (strcmp(n->channel, channel) == 0 &&
+				strcmp(n->payload, payload) == 0)
+				return true;
+		}
+	}
 
 	return false;
 }
 
+/*
+ * Add a notification event to a pre-existing pendingNotifies list.
+ *
+ * Because pendingNotifies->events is already nonempty, this works
+ * correctly no matter what CurrentMemoryContext is.
+ */
+static void
+AddEventToPendingNotifies(Notification *n)
+{
+	Assert(pendingNotifies->events != NIL);
+
+	/* Create the hash table if it's time to */
+	if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
+		pendingNotifies->hashtab == NULL)
+	{
+		HASHCTL		hash_ctl;
+		ListCell   *l;
+
+		/* Create the hash table */
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(Notification *);
+		hash_ctl.entrysize = sizeof(NotificationHash);
+		hash_ctl.hash = notification_hash;
+		hash_ctl.match = notification_match;
+		hash_ctl.hcxt = CurTransactionContext;
+		pendingNotifies->hashtab =
+			hash_create("Pending Notifies",
+						256L,
+						&hash_ctl,
+						HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+
+		/* Insert all the already-existing events */
+		foreach(l, pendingNotifies->events)
+		{
+			Notification *oldn = (Notification *) lfirst(l);
+			NotificationHash *hentry;
+			bool		found;
+
+			hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+													  &oldn,
+													  HASH_ENTER,
+													  &found);
+			Assert(!found);
+			hentry->event = oldn;
+		}
+	}
+
+	/* Add new event to the list, in order */
+	pendingNotifies->events = lappend(pendingNotifies->events, n);
+
+	/* Add event to the hash table if needed */
+	if (pendingNotifies->hashtab != NULL)
+	{
+		NotificationHash *hentry;
+		bool		found;
+
+		hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
+												  &n,
+												  HASH_ENTER,
+												  &found);
+		Assert(!found);
+		hentry->event = n;
+	}
+}
+
+/*
+ * notification_hash: hash function for notification hash table
+ *
+ * The hash "keys" are pointers to Notification structs.
+ */
+static uint32
+notification_hash(const void *key, Size keysize)
+{
+	const Notification *k = *(const Notification *const *) key;
+	uint32		hashc;
+	uint32		hashp;
+
+	Assert(keysize == sizeof(Notification *));
+	/* We just XOR the hashes for the two strings */
+	hashc = DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+									(int) strlen((const char *) k->channel)));
+	hashp = DatumGetUInt32(hash_any((const unsigned char *) k->payload,
+									(int) strlen((const char *) k->payload)));
+	return hashc ^ hashp;
+}
+
+/*
+ * notification_match: match function to use with notification_hash
+ */
+static int
+notification_match(const void *key1, const void *key2, Size keysize)
+{
+	const Notification *k1 = *(const Notification *const *) key1;
+	const Notification *k2 = *(const Notification *const *) key2;
+
+	Assert(keysize == sizeof(Notification *));
+	if (strcmp(k1->channel, k2->channel) == 0 &&
+		strcmp(k1->payload, k2->payload) == 0)
+		return 0;				/* equal */
+	return 1;					/* not equal */
+}
+
 /* Clear the pendingActions and pendingNotifies lists. */
 static void
 ClearPendingActionsAndNotifies(void)
@@ -2158,5 +2318,5 @@ ClearPendingActionsAndNotifies(void)
 	 * pointers.
 	 */
 	pendingActions = NIL;
-	pendingNotifies = NIL;
+	pendingNotifies = NULL;
 }

Reply via email to