At 2012-11-15 10:11:58 -0500, robertmh...@gmail.com wrote:
>
> It could use a run through pgindent.

Done.

> I think you want Assert(heap->size < heap->space).

Of course. Thanks for catching that.

> Project style is to omit braces for a single-line body.

I've removed most of them. In the few cases where one branch of an
if/else would have a one-line body and the other would not, I chose
to rewrite the code to avoid the problem (because, like Andrew, I
hate having to do that).

I wasn't sure how to present the following:

        if (right_off < heap->size &&
            heap->compare(&heap->nodes[node_off],
                          &heap->nodes[right_off]) < 0)
        {
            /* swap with the larger child */
            if (!swap_off ||
                heap->compare(&heap->nodes[left_off],
                              &heap->nodes[right_off]) < 0)
            {
                swap_off = right_off;
            }
        }

…so I left it alone. If you want me to remove the inner braces and
resubmit, despite the multi-line condition, let me know.

I didn't know which header comments Álvaro meant I should remove,
so I haven't done that either. I did remove the TESTBINHEAP stuff,
though.

I have attached the updated patch here. Thanks for your review.

-- Abhijit
diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 98ce3d7..327a1bc 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/lib
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = ilist.o stringinfo.o
+OBJS = ilist.o binaryheap.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk

diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
new file mode 100644
index 0000000..54d20b2
--- /dev/null
+++ b/src/include/lib/binaryheap.h
@@ -0,0 +1,121 @@
+/*
+ * binaryheap.h
+ *
+ * A simple binary heap implementation
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/lib/binaryheap.h
+ */
+
+#ifndef BINARYHEAP_H
+#define BINARYHEAP_H
+
+/*
+ * This structure represents a single node in a binaryheap, and just
+ * holds two pointers. The heap management code doesn't care what is
+ * stored in a node (in particular, the key or value may be NULL),
+ * only that the comparator function can compare any two nodes.
+ */
+
+typedef struct binaryheap_node
+{
+	void	   *key;
+	void	   *value;
+}	binaryheap_node;
+
+/*
+ * For a max-heap, the comparator must return:
+ * -1 iff a < b
+ * 0 iff a == b
+ * +1 iff a > b
+ * For a min-heap, the conditions are reversed.
+ */
+typedef int (*binaryheap_comparator) (binaryheap_node * a, binaryheap_node * b);
+
+/*
+ * binaryheap
+ *
+ *		size			how many nodes are currently in "nodes"
+ *		space			how many nodes can be stored in "nodes"
+ *		comparator		comparator to define the heap property
+ *		nodes			the first of a list of "space" nodes
+ */
+typedef struct binaryheap
+{
+	size_t		size;
+	size_t		space;
+	binaryheap_comparator compare;
+	binaryheap_node nodes[1];
+}	binaryheap;
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function.
+ */
+binaryheap * binaryheap_allocate(size_t capacity, binaryheap_comparator compare);
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void binaryheap_free(binaryheap * heap);
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given key and value to the end of the heap's list of nodes
+ * in O(1) without preserving the heap property. This is a convenience
+ * to add elements quickly to a new heap. To obtain a valid heap, one
+ * must call binaryheap_build() afterwards.
+ */
+void binaryheap_add_unordered(binaryheap * heap, void *key, void *value);
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void binaryheap_build(binaryheap * heap);
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given key and value to the heap in O(log n), while
+ * preserving the heap property.
+ */
+void binaryheap_add(binaryheap * heap, void *key, void *value);
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. Returns NULL if the heap is empty.
+ * Always O(1).
+ */
+binaryheap_node * binaryheap_first(binaryheap * heap);
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. Returns NULL if the heap
+ * is empty. O(log n) worst case.
+ */
+binaryheap_node * binaryheap_remove_first(binaryheap * heap);
+
+/*
+ * binaryheap_replace_first
+ *
+ * Change the key and/or value of the first (root, topmost) node and
+ * ensure that the heap property is preserved. O(1) in the best case,
+ * or O(log n) if it must fall back to sifting the new node down.
+ */
+void binaryheap_replace_first(binaryheap * heap, void *newkey, void *newval);
+
+#endif   /* BINARYHEAP_H */

diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c
new file mode 100644
index 0000000..c6d00d5
--- /dev/null
+++ b/src/backend/lib/binaryheap.c
@@ -0,0 +1,330 @@
+/*-------------------------------------------------------------------------
+ *
+ * binaryheap.c
+ *	  A simple binary heap implementaion
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/binaryheap.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "lib/binaryheap.h"
+
+static void sift_down(binaryheap * heap, size_t node_off);
+static void sift_up(binaryheap * heap, size_t node_off);
+static inline void swap_nodes(binaryheap * heap, size_t a, size_t b);
+
+/*
+ * binaryheap_allocate
+ *
+ * Returns a pointer to a newly-allocated heap that has the capacity to
+ * store the given number of nodes, with the heap property defined by
+ * the given comparator function.
+ */
+binaryheap *
+binaryheap_allocate(size_t capacity, binaryheap_comparator compare)
+{
+	int			sz = sizeof(binaryheap) + sizeof(binaryheap_node) * (capacity - 1);
+
+	binaryheap *heap = palloc(sz);
+
+	heap->compare = compare;
+	heap->space = capacity;
+	heap->size = 0;
+
+	return heap;
+}
+
+/*
+ * binaryheap_free
+ *
+ * Releases memory used by the given binaryheap.
+ */
+void
+binaryheap_free(binaryheap * heap)
+{
+	pfree(heap);
+}
+
+/*
+ * These utility functions return the offset of the left child, right
+ * child, and parent of the node at the given index, respectively.
+ *
+ * The heap is represented as an array of nodes, with the root node
+ * stored at index 0. The left child of node i is at index 2*i+1, and
+ * the right child at 2*i+2. The parent of node i is at index (i-1)/2.
+ */
+
+static inline int
+left_offset(size_t i)
+{
+	return 2 * i + 1;
+}
+
+static inline int
+right_offset(size_t i)
+{
+	return 2 * i + 2;
+}
+
+static inline int
+parent_offset(size_t i)
+{
+	return floor((i - 1) / 2);
+}
+
+/*
+ * binaryheap_add_unordered
+ *
+ * Adds the given key and value to the end of the heap's list of nodes
+ * in O(1) without preserving the heap property. This is a convenience
+ * to add elements quickly to a new heap. To obtain a valid heap, one
+ * must call binaryheap_build() afterwards.
+ */
+void
+binaryheap_add_unordered(binaryheap * heap, void *key, void *value)
+{
+	Assert(heap->size < heap->space);
+	heap->nodes[heap->size].key = key;
+	heap->nodes[heap->size].value = value;
+	heap->size++;
+}
+
+/*
+ * binaryheap_build
+ *
+ * Assembles a valid heap in O(n) from the nodes added by
+ * binaryheap_add_unordered(). Not needed otherwise.
+ */
+void
+binaryheap_build(binaryheap * heap)
+{
+	int			i;
+
+	for (i = parent_offset(heap->size - 1); i >= 0; i--)
+		sift_down(heap, i);
+}
+
+/*
+ * binaryheap_add
+ *
+ * Adds the given key and value to the heap in O(log n), while
+ * preserving the heap property.
+ */
+void
+binaryheap_add(binaryheap * heap, void *key, void *value)
+{
+	binaryheap_add_unordered(heap, key, value);
+	sift_up(heap, heap->size - 1);
+}
+
+/*
+ * binaryheap_first
+ *
+ * Returns a pointer to the first (root, topmost) node in the heap
+ * without modifying the heap. Returns NULL if the heap is empty.
+ * Always O(1).
+ */
+binaryheap_node *
+binaryheap_first(binaryheap * heap)
+{
+	if (!heap->size)
+		return NULL;
+	return &heap->nodes[0];
+}
+
+/*
+ * binaryheap_remove_first
+ *
+ * Removes the first (root, topmost) node in the heap and returns a
+ * pointer to it after rebalancing the heap. Returns NULL if the heap
+ * is empty. O(log n) worst case.
+ */
+binaryheap_node *
+binaryheap_remove_first(binaryheap * heap)
+{
+	if (heap->size == 0)
+		return NULL;
+
+	if (heap->size == 1)
+	{
+		heap->size--;
+		return &heap->nodes[0];
+	}
+
+	/*
+	 * Swap the root and last nodes, decrease the size of the heap (i.e.
+	 * remove the former root node) and sift the new root node down to its
+	 * correct position.
+	 */
+
+	swap_nodes(heap, 0, heap->size - 1);
+
+	heap->size--;
+	sift_down(heap, 0);
+	return &heap->nodes[heap->size];
+}
+
+/*
+ * binaryheap_replace_first
+ *
+ * Change the key and/or value of the first (root, topmost) node and
+ * ensure that the heap property is preserved. O(1) in the best case,
+ * or O(log n) if it must fall back to sifting the new node down.
+ */
+void
+binaryheap_replace_first(binaryheap * heap, void *key, void *val)
+{
+	int			cmp;
+	size_t		next_off = 0;
+
+	if (key)
+		heap->nodes[0].key = key;
+	if (val)
+		heap->nodes[0].value = val;
+
+	/*
+	 * If the new root node is still larger than the largest of its children
+	 * (of which there may be 0, 1, or 2), then the heap is still valid.
+	 */
+
+	if (heap->size == 1)
+		return;
+
+	next_off = 1;
+	if (heap->size > 2)
+	{
+		/* Two children: pick the larger one */
+		cmp = heap->compare(&heap->nodes[2], &heap->nodes[1]);
+		if (cmp > 0)
+			next_off = 2;
+	}
+
+	cmp = heap->compare(&heap->nodes[next_off], &heap->nodes[0]);
+	if (cmp < 0)
+		return;
+
+	/* The child is larger. Swap and sift the new node down. */
+
+	swap_nodes(heap, 0, next_off);
+	sift_down(heap, next_off);
+}
+
+/*
+ * Swap the contents of two nodes.
+ */
+static inline void
+swap_nodes(binaryheap * heap, size_t a, size_t b)
+{
+	binaryheap_node swap;
+
+	swap.value = heap->nodes[a].value;
+	swap.key = heap->nodes[a].key;
+
+	heap->nodes[a].value = heap->nodes[b].value;
+	heap->nodes[a].key = heap->nodes[b].key;
+
+	heap->nodes[b].key = swap.key;
+	heap->nodes[b].value = swap.value;
+}
+
+/*
+ * Sift a node up to the highest position it can hold according to the
+ * comparator.
+ */
+static void
+sift_up(binaryheap * heap, size_t node_off)
+{
+	while (true)
+	{
+		int			cmp;
+		size_t		parent_off;
+
+		if (node_off == 0)
+			break;
+
+		/*
+		 * If this node is smaller than its parent, the heap condition is
+		 * satisfied, and we're done.
+		 */
+
+		parent_off = parent_offset(node_off);
+		cmp = heap->compare(&heap->nodes[node_off],
+							&heap->nodes[parent_off]);
+		if (cmp <= 0)
+			break;
+
+		/*
+		 * Otherwise, swap the node and its parent and go on to check the
+		 * node's new parent.
+		 */
+
+		swap_nodes(heap, node_off, parent_off);
+		node_off = parent_off;
+	}
+}
+
+/*
+ * Sift a node down from its current position to satisfy the heap
+ * property.
+ */
+static void
+sift_down(binaryheap * heap, size_t node_off)
+{
+	while (true)
+	{
+		size_t		left_off = left_offset(node_off);
+		size_t		right_off = right_offset(node_off);
+		size_t		swap_off = 0;
+
+		/* Is the left child larger than the parent? */
+
+		if (left_off < heap->size &&
+			heap->compare(&heap->nodes[node_off],
+						  &heap->nodes[left_off]) < 0)
+		{
+			swap_off = left_off;
+		}
+
+		/*
+		 * If not (note: only one child can violate the heap property after a
+		 * change), is the right child larger?
+		 */
+
+		if (right_off < heap->size &&
+			heap->compare(&heap->nodes[node_off],
+						  &heap->nodes[right_off]) < 0)
+		{
+			/* swap with the larger child */
+			if (!swap_off ||
+				heap->compare(&heap->nodes[left_off],
+							  &heap->nodes[right_off]) < 0)
+			{
+				swap_off = right_off;
+			}
+		}
+
+		/*
+		 * If we didn't find anything to swap, the heap condition is
+		 * satisfied, and we're done.
+		 */
+
+		if (!swap_off)
+			break;
+
+		/*
+		 * Otherwise, swap the node with the child that violates the heap
+		 * property; then go on to check its children.
+		 */
+
+		swap_nodes(heap, swap_off, node_off);
+		node_off = swap_off;
+	}
+}

diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index fec07b8..d4911bd 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1100,10 +1100,8 @@ typedef struct AppendState
  *		nkeys			number of sort key columns
  *		sortkeys		sort keys in SortSupport representation
  *		slots			current output tuple of each subplan
- *		heap			heap of active tuples (represented as array indexes)
- *		heap_size		number of active heap entries
+ *		heap			heap of active tuples
  *		initialized		true if we have fetched first tuple from each subplan
- *		last_slot		last subplan fetched from (which must be re-called)
  * ----------------
  */
 typedef struct MergeAppendState
@@ -1114,10 +1112,8 @@ typedef struct MergeAppendState
 	int			ms_nkeys;
 	SortSupport ms_sortkeys;	/* array of length ms_nkeys */
 	TupleTableSlot **ms_slots;	/* array of length ms_nplans */
-	int		   *ms_heap;		/* array of length ms_nplans */
-	int			ms_heap_size;	/* current active length of ms_heap[] */
+	struct binaryheap *ms_heap; /* binary heap of slot indices */
 	bool		ms_initialized; /* are subplans started? */
-	int			ms_last_slot;	/* last subplan slot we returned from */
 } MergeAppendState;
 
 /* ----------------

diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index d5141ba..fa05ddb 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -41,6 +41,8 @@
 #include "executor/execdebug.h"
 #include "executor/nodeMergeAppend.h"
 
+#include "lib/binaryheap.h"
+
 /*
  * It gets quite confusing having a heap array (indexed by integers) which
  * contains integers which index into the slots array. These typedefs try to
@@ -49,9 +51,7 @@
 typedef int SlotNumber;
 typedef int HeapPosition;
 
-static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot);
-static void heap_siftup_slot(MergeAppendState *node);
-static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2);
+static int	heap_compare_slots(binaryheap_node * a, binaryheap_node * b);
 
 
 /* ----------------------------------------------------------------
@@ -88,7 +88,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	mergestate->ms_nplans = nplans;
 
 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
-	mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans);
+	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots);
 
 	/*
 	 * Miscellaneous initialization
@@ -143,9 +143,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	/*
 	 * initialize to show we have not run the subplans yet
 	 */
-	mergestate->ms_heap_size = 0;
 	mergestate->ms_initialized = false;
-	mergestate->ms_last_slot = -1;
 
 	return mergestate;
 }
@@ -160,6 +158,7 @@ TupleTableSlot *
 ExecMergeAppend(MergeAppendState *node)
 {
 	TupleTableSlot *result;
+	binaryheap_node *hn;
 	SlotNumber	i;
 
 	if (!node->ms_initialized)
@@ -172,7 +171,7 @@ ExecMergeAppend(MergeAppendState *node)
 		{
 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 			if (!TupIsNull(node->ms_slots[i]))
-				heap_insert_slot(node, i);
+				binaryheap_add(node->ms_heap, node, (void *) i);
 		}
 		node->ms_initialized = true;
 	}
@@ -184,19 +183,20 @@ ExecMergeAppend(MergeAppendState *node)
 		 * the logic a bit by doing this before returning from the prior call,
 		 * but it's better to not pull tuples until necessary.)
 		 */
-		i = node->ms_last_slot;
+		hn = binaryheap_first(node->ms_heap);
+		i = (int) hn->value;
 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 		if (!TupIsNull(node->ms_slots[i]))
-			heap_insert_slot(node, i);
+			binaryheap_replace_first(node->ms_heap, node, (void *) i);
+		else
+			(void) binaryheap_remove_first(node->ms_heap);
 	}
 
-	if (node->ms_heap_size > 0)
+	hn = binaryheap_first(node->ms_heap);
+	if (hn)
 	{
-		/* Return the topmost heap node, and sift up the remaining nodes */
-		i = node->ms_heap[0];
+		i = (int) hn->value;
 		result = node->ms_slots[i];
-		node->ms_last_slot = i;
-		heap_siftup_slot(node);
 	}
 	else
 	{
@@ -208,65 +208,15 @@ ExecMergeAppend(MergeAppendState *node)
 }
 
 /*
- * Insert a new slot into the heap.  The slot must contain a valid tuple.
- */
-static void
-heap_insert_slot(MergeAppendState *node, SlotNumber new_slot)
-{
-	SlotNumber *heap = node->ms_heap;
-	HeapPosition j;
-
-	Assert(!TupIsNull(node->ms_slots[new_slot]));
-
-	j = node->ms_heap_size++;	/* j is where the "hole" is */
-	while (j > 0)
-	{
-		int			i = (j - 1) / 2;
-
-		if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0)
-			break;
-		heap[j] = heap[i];
-		j = i;
-	}
-	heap[j] = new_slot;
-}
-
-/*
- * Delete the heap top (the slot in heap[0]), and sift up.
- */
-static void
-heap_siftup_slot(MergeAppendState *node)
-{
-	SlotNumber *heap = node->ms_heap;
-	HeapPosition i,
-				n;
-
-	if (--node->ms_heap_size <= 0)
-		return;
-	n = node->ms_heap_size;		/* heap[n] needs to be reinserted */
-	i = 0;						/* i is where the "hole" is */
-	for (;;)
-	{
-		int			j = 2 * i + 1;
-
-		if (j >= n)
-			break;
-		if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0)
-			j++;
-		if (heap_compare_slots(node, heap[n], heap[j]) <= 0)
-			break;
-		heap[i] = heap[j];
-		i = j;
-	}
-	heap[i] = heap[n];
-}
-
-/*
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
+heap_compare_slots(binaryheap_node * a, binaryheap_node * b)
 {
+	MergeAppendState *node = (MergeAppendState *) a->key;
+	SlotNumber	slot1 = (int) a->value;
+	SlotNumber	slot2 = (int) b->value;
+
 	TupleTableSlot *s1 = node->ms_slots[slot1];
 	TupleTableSlot *s2 = node->ms_slots[slot2];
 	int			nkey;
@@ -291,7 +241,7 @@ heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2)
 									  datum2, isNull2,
 									  sortKey);
 		if (compare != 0)
-			return compare;
+			return -compare;
 	}
 	return 0;
 }
@@ -347,7 +297,5 @@ ExecReScanMergeAppend(MergeAppendState *node)
 		if (subnode->chgParam == NULL)
 			ExecReScan(subnode);
 	}
-	node->ms_heap_size = 0;
 	node->ms_initialized = false;
-	node->ms_last_slot = -1;
 }
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to