At 2012-11-15 10:11:58 -0500, robertmh...@gmail.com wrote: > > It could use a run through pgindent.
Done. > I think you want Assert(heap->size < heap->space). Of course. Thanks for catching that. > Project style is to omit braces for a single-line body. I've removed most of them. In the few cases where one branch of an if/else would have a one-line body and the other would not, I chose to rewrite the code to avoid the problem (because, like Andrew, I hate having to do that). I wasn't sure how to present the following: if (right_off < heap->size && heap->compare(&heap->nodes[node_off], &heap->nodes[right_off]) < 0) { /* swap with the larger child */ if (!swap_off || heap->compare(&heap->nodes[left_off], &heap->nodes[right_off]) < 0) { swap_off = right_off; } } …so I left it alone. If you want me to remove the inner braces and resubmit, despite the multi-line condition, let me know. I didn't know which header comments Álvaro meant I should remove, so I haven't done that either. I did remove the TESTBINHEAP stuff, though. I have attached the updated patch here. Thanks for your review. -- Abhijit
diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile index 98ce3d7..327a1bc 100644 --- a/src/backend/lib/Makefile +++ b/src/backend/lib/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/lib top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = ilist.o stringinfo.o +OBJS = ilist.o binaryheap.o stringinfo.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h new file mode 100644 index 0000000..54d20b2 --- /dev/null +++ b/src/include/lib/binaryheap.h @@ -0,0 +1,121 @@ +/* + * binaryheap.h + * + * A simple binary heap implementation + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * src/include/lib/binaryheap.h + */ + +#ifndef BINARYHEAP_H +#define BINARYHEAP_H + +/* + * This structure represents a single node in a binaryheap, and just + * holds two pointers. The heap management code doesn't care what is + * stored in a node (in particular, the key or value may be NULL), + * only that the comparator function can compare any two nodes. + */ + +typedef struct binaryheap_node +{ + void *key; + void *value; +} binaryheap_node; + +/* + * For a max-heap, the comparator must return: + * -1 iff a < b + * 0 iff a == b + * +1 iff a > b + * For a min-heap, the conditions are reversed. + */ +typedef int (*binaryheap_comparator) (binaryheap_node * a, binaryheap_node * b); + +/* + * binaryheap + * + * size how many nodes are currently in "nodes" + * space how many nodes can be stored in "nodes" + * comparator comparator to define the heap property + * nodes the first of a list of "space" nodes + */ +typedef struct binaryheap +{ + size_t size; + size_t space; + binaryheap_comparator compare; + binaryheap_node nodes[1]; +} binaryheap; + +/* + * binaryheap_allocate + * + * Returns a pointer to a newly-allocated heap that has the capacity to + * store the given number of nodes, with the heap property defined by + * the given comparator function. + */ +binaryheap * binaryheap_allocate(size_t capacity, binaryheap_comparator compare); + +/* + * binaryheap_free + * + * Releases memory used by the given binaryheap. + */ +void binaryheap_free(binaryheap * heap); + +/* + * binaryheap_add_unordered + * + * Adds the given key and value to the end of the heap's list of nodes + * in O(1) without preserving the heap property. This is a convenience + * to add elements quickly to a new heap. To obtain a valid heap, one + * must call binaryheap_build() afterwards. + */ +void binaryheap_add_unordered(binaryheap * heap, void *key, void *value); + +/* + * binaryheap_build + * + * Assembles a valid heap in O(n) from the nodes added by + * binaryheap_add_unordered(). Not needed otherwise. + */ +void binaryheap_build(binaryheap * heap); + +/* + * binaryheap_add + * + * Adds the given key and value to the heap in O(log n), while + * preserving the heap property. + */ +void binaryheap_add(binaryheap * heap, void *key, void *value); + +/* + * binaryheap_first + * + * Returns a pointer to the first (root, topmost) node in the heap + * without modifying the heap. Returns NULL if the heap is empty. + * Always O(1). + */ +binaryheap_node * binaryheap_first(binaryheap * heap); + +/* + * binaryheap_remove_first + * + * Removes the first (root, topmost) node in the heap and returns a + * pointer to it after rebalancing the heap. Returns NULL if the heap + * is empty. O(log n) worst case. + */ +binaryheap_node * binaryheap_remove_first(binaryheap * heap); + +/* + * binaryheap_replace_first + * + * Change the key and/or value of the first (root, topmost) node and + * ensure that the heap property is preserved. O(1) in the best case, + * or O(log n) if it must fall back to sifting the new node down. + */ +void binaryheap_replace_first(binaryheap * heap, void *newkey, void *newval); + +#endif /* BINARYHEAP_H */ diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c new file mode 100644 index 0000000..c6d00d5 --- /dev/null +++ b/src/backend/lib/binaryheap.c @@ -0,0 +1,330 @@ +/*------------------------------------------------------------------------- + * + * binaryheap.c + * A simple binary heap implementaion + * + * Portions Copyright (c) 2012, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/lib/binaryheap.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <math.h> + +#include "lib/binaryheap.h" + +static void sift_down(binaryheap * heap, size_t node_off); +static void sift_up(binaryheap * heap, size_t node_off); +static inline void swap_nodes(binaryheap * heap, size_t a, size_t b); + +/* + * binaryheap_allocate + * + * Returns a pointer to a newly-allocated heap that has the capacity to + * store the given number of nodes, with the heap property defined by + * the given comparator function. + */ +binaryheap * +binaryheap_allocate(size_t capacity, binaryheap_comparator compare) +{ + int sz = sizeof(binaryheap) + sizeof(binaryheap_node) * (capacity - 1); + + binaryheap *heap = palloc(sz); + + heap->compare = compare; + heap->space = capacity; + heap->size = 0; + + return heap; +} + +/* + * binaryheap_free + * + * Releases memory used by the given binaryheap. + */ +void +binaryheap_free(binaryheap * heap) +{ + pfree(heap); +} + +/* + * These utility functions return the offset of the left child, right + * child, and parent of the node at the given index, respectively. + * + * The heap is represented as an array of nodes, with the root node + * stored at index 0. The left child of node i is at index 2*i+1, and + * the right child at 2*i+2. The parent of node i is at index (i-1)/2. + */ + +static inline int +left_offset(size_t i) +{ + return 2 * i + 1; +} + +static inline int +right_offset(size_t i) +{ + return 2 * i + 2; +} + +static inline int +parent_offset(size_t i) +{ + return floor((i - 1) / 2); +} + +/* + * binaryheap_add_unordered + * + * Adds the given key and value to the end of the heap's list of nodes + * in O(1) without preserving the heap property. This is a convenience + * to add elements quickly to a new heap. To obtain a valid heap, one + * must call binaryheap_build() afterwards. + */ +void +binaryheap_add_unordered(binaryheap * heap, void *key, void *value) +{ + Assert(heap->size < heap->space); + heap->nodes[heap->size].key = key; + heap->nodes[heap->size].value = value; + heap->size++; +} + +/* + * binaryheap_build + * + * Assembles a valid heap in O(n) from the nodes added by + * binaryheap_add_unordered(). Not needed otherwise. + */ +void +binaryheap_build(binaryheap * heap) +{ + int i; + + for (i = parent_offset(heap->size - 1); i >= 0; i--) + sift_down(heap, i); +} + +/* + * binaryheap_add + * + * Adds the given key and value to the heap in O(log n), while + * preserving the heap property. + */ +void +binaryheap_add(binaryheap * heap, void *key, void *value) +{ + binaryheap_add_unordered(heap, key, value); + sift_up(heap, heap->size - 1); +} + +/* + * binaryheap_first + * + * Returns a pointer to the first (root, topmost) node in the heap + * without modifying the heap. Returns NULL if the heap is empty. + * Always O(1). + */ +binaryheap_node * +binaryheap_first(binaryheap * heap) +{ + if (!heap->size) + return NULL; + return &heap->nodes[0]; +} + +/* + * binaryheap_remove_first + * + * Removes the first (root, topmost) node in the heap and returns a + * pointer to it after rebalancing the heap. Returns NULL if the heap + * is empty. O(log n) worst case. + */ +binaryheap_node * +binaryheap_remove_first(binaryheap * heap) +{ + if (heap->size == 0) + return NULL; + + if (heap->size == 1) + { + heap->size--; + return &heap->nodes[0]; + } + + /* + * Swap the root and last nodes, decrease the size of the heap (i.e. + * remove the former root node) and sift the new root node down to its + * correct position. + */ + + swap_nodes(heap, 0, heap->size - 1); + + heap->size--; + sift_down(heap, 0); + return &heap->nodes[heap->size]; +} + +/* + * binaryheap_replace_first + * + * Change the key and/or value of the first (root, topmost) node and + * ensure that the heap property is preserved. O(1) in the best case, + * or O(log n) if it must fall back to sifting the new node down. + */ +void +binaryheap_replace_first(binaryheap * heap, void *key, void *val) +{ + int cmp; + size_t next_off = 0; + + if (key) + heap->nodes[0].key = key; + if (val) + heap->nodes[0].value = val; + + /* + * If the new root node is still larger than the largest of its children + * (of which there may be 0, 1, or 2), then the heap is still valid. + */ + + if (heap->size == 1) + return; + + next_off = 1; + if (heap->size > 2) + { + /* Two children: pick the larger one */ + cmp = heap->compare(&heap->nodes[2], &heap->nodes[1]); + if (cmp > 0) + next_off = 2; + } + + cmp = heap->compare(&heap->nodes[next_off], &heap->nodes[0]); + if (cmp < 0) + return; + + /* The child is larger. Swap and sift the new node down. */ + + swap_nodes(heap, 0, next_off); + sift_down(heap, next_off); +} + +/* + * Swap the contents of two nodes. + */ +static inline void +swap_nodes(binaryheap * heap, size_t a, size_t b) +{ + binaryheap_node swap; + + swap.value = heap->nodes[a].value; + swap.key = heap->nodes[a].key; + + heap->nodes[a].value = heap->nodes[b].value; + heap->nodes[a].key = heap->nodes[b].key; + + heap->nodes[b].key = swap.key; + heap->nodes[b].value = swap.value; +} + +/* + * Sift a node up to the highest position it can hold according to the + * comparator. + */ +static void +sift_up(binaryheap * heap, size_t node_off) +{ + while (true) + { + int cmp; + size_t parent_off; + + if (node_off == 0) + break; + + /* + * If this node is smaller than its parent, the heap condition is + * satisfied, and we're done. + */ + + parent_off = parent_offset(node_off); + cmp = heap->compare(&heap->nodes[node_off], + &heap->nodes[parent_off]); + if (cmp <= 0) + break; + + /* + * Otherwise, swap the node and its parent and go on to check the + * node's new parent. + */ + + swap_nodes(heap, node_off, parent_off); + node_off = parent_off; + } +} + +/* + * Sift a node down from its current position to satisfy the heap + * property. + */ +static void +sift_down(binaryheap * heap, size_t node_off) +{ + while (true) + { + size_t left_off = left_offset(node_off); + size_t right_off = right_offset(node_off); + size_t swap_off = 0; + + /* Is the left child larger than the parent? */ + + if (left_off < heap->size && + heap->compare(&heap->nodes[node_off], + &heap->nodes[left_off]) < 0) + { + swap_off = left_off; + } + + /* + * If not (note: only one child can violate the heap property after a + * change), is the right child larger? + */ + + if (right_off < heap->size && + heap->compare(&heap->nodes[node_off], + &heap->nodes[right_off]) < 0) + { + /* swap with the larger child */ + if (!swap_off || + heap->compare(&heap->nodes[left_off], + &heap->nodes[right_off]) < 0) + { + swap_off = right_off; + } + } + + /* + * If we didn't find anything to swap, the heap condition is + * satisfied, and we're done. + */ + + if (!swap_off) + break; + + /* + * Otherwise, swap the node with the child that violates the heap + * property; then go on to check its children. + */ + + swap_nodes(heap, swap_off, node_off); + node_off = swap_off; + } +} diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index fec07b8..d4911bd 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1100,10 +1100,8 @@ typedef struct AppendState * nkeys number of sort key columns * sortkeys sort keys in SortSupport representation * slots current output tuple of each subplan - * heap heap of active tuples (represented as array indexes) - * heap_size number of active heap entries + * heap heap of active tuples * initialized true if we have fetched first tuple from each subplan - * last_slot last subplan fetched from (which must be re-called) * ---------------- */ typedef struct MergeAppendState @@ -1114,10 +1112,8 @@ typedef struct MergeAppendState int ms_nkeys; SortSupport ms_sortkeys; /* array of length ms_nkeys */ TupleTableSlot **ms_slots; /* array of length ms_nplans */ - int *ms_heap; /* array of length ms_nplans */ - int ms_heap_size; /* current active length of ms_heap[] */ + struct binaryheap *ms_heap; /* binary heap of slot indices */ bool ms_initialized; /* are subplans started? */ - int ms_last_slot; /* last subplan slot we returned from */ } MergeAppendState; /* ---------------- diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index d5141ba..fa05ddb 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -41,6 +41,8 @@ #include "executor/execdebug.h" #include "executor/nodeMergeAppend.h" +#include "lib/binaryheap.h" + /* * It gets quite confusing having a heap array (indexed by integers) which * contains integers which index into the slots array. These typedefs try to @@ -49,9 +51,7 @@ typedef int SlotNumber; typedef int HeapPosition; -static void heap_insert_slot(MergeAppendState *node, SlotNumber new_slot); -static void heap_siftup_slot(MergeAppendState *node); -static int32 heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2); +static int heap_compare_slots(binaryheap_node * a, binaryheap_node * b); /* ---------------------------------------------------------------- @@ -88,7 +88,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) mergestate->ms_nplans = nplans; mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); - mergestate->ms_heap = (int *) palloc0(sizeof(int) * nplans); + mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots); /* * Miscellaneous initialization @@ -143,9 +143,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) /* * initialize to show we have not run the subplans yet */ - mergestate->ms_heap_size = 0; mergestate->ms_initialized = false; - mergestate->ms_last_slot = -1; return mergestate; } @@ -160,6 +158,7 @@ TupleTableSlot * ExecMergeAppend(MergeAppendState *node) { TupleTableSlot *result; + binaryheap_node *hn; SlotNumber i; if (!node->ms_initialized) @@ -172,7 +171,7 @@ ExecMergeAppend(MergeAppendState *node) { node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - heap_insert_slot(node, i); + binaryheap_add(node->ms_heap, node, (void *) i); } node->ms_initialized = true; } @@ -184,19 +183,20 @@ ExecMergeAppend(MergeAppendState *node) * the logic a bit by doing this before returning from the prior call, * but it's better to not pull tuples until necessary.) */ - i = node->ms_last_slot; + hn = binaryheap_first(node->ms_heap); + i = (int) hn->value; node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - heap_insert_slot(node, i); + binaryheap_replace_first(node->ms_heap, node, (void *) i); + else + (void) binaryheap_remove_first(node->ms_heap); } - if (node->ms_heap_size > 0) + hn = binaryheap_first(node->ms_heap); + if (hn) { - /* Return the topmost heap node, and sift up the remaining nodes */ - i = node->ms_heap[0]; + i = (int) hn->value; result = node->ms_slots[i]; - node->ms_last_slot = i; - heap_siftup_slot(node); } else { @@ -208,65 +208,15 @@ ExecMergeAppend(MergeAppendState *node) } /* - * Insert a new slot into the heap. The slot must contain a valid tuple. - */ -static void -heap_insert_slot(MergeAppendState *node, SlotNumber new_slot) -{ - SlotNumber *heap = node->ms_heap; - HeapPosition j; - - Assert(!TupIsNull(node->ms_slots[new_slot])); - - j = node->ms_heap_size++; /* j is where the "hole" is */ - while (j > 0) - { - int i = (j - 1) / 2; - - if (heap_compare_slots(node, new_slot, node->ms_heap[i]) >= 0) - break; - heap[j] = heap[i]; - j = i; - } - heap[j] = new_slot; -} - -/* - * Delete the heap top (the slot in heap[0]), and sift up. - */ -static void -heap_siftup_slot(MergeAppendState *node) -{ - SlotNumber *heap = node->ms_heap; - HeapPosition i, - n; - - if (--node->ms_heap_size <= 0) - return; - n = node->ms_heap_size; /* heap[n] needs to be reinserted */ - i = 0; /* i is where the "hole" is */ - for (;;) - { - int j = 2 * i + 1; - - if (j >= n) - break; - if (j + 1 < n && heap_compare_slots(node, heap[j], heap[j + 1]) > 0) - j++; - if (heap_compare_slots(node, heap[n], heap[j]) <= 0) - break; - heap[i] = heap[j]; - i = j; - } - heap[i] = heap[n]; -} - -/* * Compare the tuples in the two given slots. */ static int32 -heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2) +heap_compare_slots(binaryheap_node * a, binaryheap_node * b) { + MergeAppendState *node = (MergeAppendState *) a->key; + SlotNumber slot1 = (int) a->value; + SlotNumber slot2 = (int) b->value; + TupleTableSlot *s1 = node->ms_slots[slot1]; TupleTableSlot *s2 = node->ms_slots[slot2]; int nkey; @@ -291,7 +241,7 @@ heap_compare_slots(MergeAppendState *node, SlotNumber slot1, SlotNumber slot2) datum2, isNull2, sortKey); if (compare != 0) - return compare; + return -compare; } return 0; } @@ -347,7 +297,5 @@ ExecReScanMergeAppend(MergeAppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } - node->ms_heap_size = 0; node->ms_initialized = false; - node->ms_last_slot = -1; }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers