diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index f222c6c20d1..7033dfb1aa8 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/lib
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = binaryheap.o bipartite_match.o hyperloglog.o ilist.o knapsack.o \
+OBJS = binaryheap.o bipartite_match.o dht.o hyperloglog.o ilist.o knapsack.o \
        pairingheap.o rbtree.o stringinfo.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/lib/dht.c b/src/backend/lib/dht.c
new file mode 100644
index 00000000000..2fec70f7742
--- /dev/null
+++ b/src/backend/lib/dht.c
@@ -0,0 +1,1365 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.c
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * This is an open hashing hash table, with a linked list at each table
+ * entry.  It supports dynamic resizing, as required to prevent the linked
+ * lists from growing too long on average.  Currently, only growing is
+ * supported: the hash table never becomes smaller.
+ *
+ * To deal with currency, it has a fixed size set of partitions, each of which
+ * is independently locked.  Each bucket maps to a partition; so insert, find
+ * and iterate operations normally only acquire one lock.  Therefore, good
+ * concurrency is achieved whenever they don't collide at the lock partition
+ * level.  However, when a resize operation begins, all partition locks must
+ * be acquired simultaneously for a brief period.  This is only expected to
+ * happen a small number of times until a stable size is found, since growth is
+ * geometric.
+ *
+ * Resizing is done incrementally so that no individual insert operation pays
+ * for the potentially large cost of splitting all buckets.  A resize
+ * operation begins when any partition exceeds a certain load factor, and
+ * splits every bucket into two new buckets but doesn't yet transfer any items
+ * into the new buckets.  Then, future dht_release operations involving an
+ * exclusive lock transfer one item from an old bucket into the appropriate
+ * new bucket.  If possible, an item from the already-locked partition is
+ * migrated.  If none remain, a lock on another partition will be acquired and
+ * one item from that partition will be migrated.  This continues until there
+ * are no items left in old buckets, and the resize operation is finished.
+ * While a resize is in progress, find, insert and delete operations must look
+ * in a new bucket and an old bucket, but this is not too expensive since they
+ * are covered by the same lock.  That is, a given hash value maps to the same
+ * lock partition, no matter how many times buckets have been split.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/lib/dht.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "lib/dht.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "utils/dsa.h"
+#include "utils/memutils.h"
+
+/*
+ * An item in the hash table.  This wraps the user's entry object in an
+ * envelop that holds a pointer back to the bucket and a pointer to the next
+ * item in the bucket.
+ */
+struct dht_hash_table_item
+{
+	/* The hashed key, to avoid having to recompute it. */
+	dht_hash	hash;
+	/* The next item in the same bucket. */
+	dsa_pointer next;
+	/* The user's entry object follows here. */
+	char		entry[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* The number of partitions for locking purposes. */
+#define DHT_NUM_PARTITIONS_LOG2 7
+#define DHT_NUM_PARTITIONS (1 << DHT_NUM_PARTITIONS_LOG2)
+
+/* A magic value used to identify DHT hash tables. */
+#define DHT_MAGIC 0x75ff6a20
+
+/*
+ * Tracking information for each lock partition.  Initially, each partition
+ * corresponds to one bucket, but each time the hash table grows, the buckets
+ * covered by each partition split so the number of buckets covered doubles.
+ *
+ * We might want to add padding here so that each partition is on a different
+ * cache line, but doing so would bloat this structure considerably.
+ */
+typedef struct
+{
+	LWLock		lock;			/* Protects all buckets in this partition. */
+	size_t		count;			/* # of items in active buckets */
+	size_t		old_count;		/* # of items in old buckets */
+	Index		next_bucket_to_split;	/* do_increment_resize progress
+										 * tracker */
+} dht_partition;
+
+/*
+ * The head object for a hash table.  This will be stored in dynamic shared
+ * memory.
+ */
+typedef struct
+{
+	dht_hash_table_handle handle;
+	uint32		magic;
+	dht_partition partitions[DHT_NUM_PARTITIONS];
+	int			lwlock_tranche_id;
+
+	/*
+	 * The following members are written to only when ALL partitions locks are
+	 * held.  They can be read when any one partition lock is held.
+	 */
+
+	/* Number of buckets expressed as power of 2 (8 = 256 buckets). */
+	size_t		size_log2;		/* log2(number of buckets) */
+	dsa_pointer buckets;		/* current bucket array */
+	dsa_pointer old_buckets;	/* previous bucket array, if splitting */
+}	dht_hash_table_control;
+
+/*
+ * Per-backend state for a dynamic hash table.
+ */
+struct dht_hash_table
+{
+	dsa_area   *area;			/* Backing dynamic shared memory area. */
+	dht_parameters params;		/* Parameters. */
+	dht_hash_table_control *control;	/* Control object in DSM. */
+	dsa_pointer *buckets;		/* Current bucket pointers in DSM. */
+	size_t		size_log2;		/* log2(number of buckets) */
+	dsa_pointer *old_buckets;	/* Old bucket pointers in DSM. */
+	bool		exclusively_locked;		/* Is an exclusive partition lock
+										 * held? */
+};
+
+/* Given a pointer to an entry, find the pointer to the item that holds it. */
+#define ITEM_FROM_ENTRY(entry)										\
+	((dht_hash_table_item *)((char *)entry -						\
+							 offsetof(dht_hash_table_item, entry)))
+
+/* How many resize operations (bucket splits) have there been? */
+#define NUM_SPLITS(size_log2)					\
+	(size_log2 - DHT_NUM_PARTITIONS_LOG2)
+
+/* How many buckets are there in each partition at a given size? */
+#define BUCKETS_PER_PARTITION(size_log2)		\
+	(1 << NUM_SPLITS(size_log2))
+
+/* Max entries before we need to grow.  Half + quarter = 75% load factor. */
+#define MAX_COUNT_PER_PARTITION(hash_table)				\
+	(BUCKETS_PER_PARTITION(hash_table->size_log2) / 2 + \
+	 BUCKETS_PER_PARTITION(hash_table->size_log2) / 4)
+
+/* Choose partition based on the highest order bits of the hash. */
+#define PARTITION_FOR_HASH(hash)										\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - DHT_NUM_PARTITIONS_LOG2))
+
+/*
+ * Find the bucket index for a given hash and table size.  Each time the table
+ * doubles in size, the appropriate bucket for a given hash value doubles and
+ * possibly adds one, depending on the newly revealed bit, so that all buckets
+ * are split.
+ */
+#define BUCKET_INDEX_FOR_HASH_AND_SIZE(hash, size_log2)		\
+	(hash >> ((sizeof(dht_hash) * CHAR_BIT) - (size_log2)))
+
+/* The index of the first bucket in a given partition. */
+#define BUCKET_INDEX_FOR_PARTITION(partition, size_log2)	\
+	((partition) << NUM_SPLITS(size_log2))
+
+/* The head of the active bucket for a given hash value (lvalue). */
+#define BUCKET_FOR_HASH(hash_table, hash)								\
+	(hash_table->buckets[												\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2)])
+
+/* The head of the old bucket for a given hash value (lvalue). */
+#define OLD_BUCKET_FOR_HASH(hash_table, hash)							\
+	(hash_table->old_buckets[											\
+		BUCKET_INDEX_FOR_HASH_AND_SIZE(hash,							\
+									   hash_table->size_log2 - 1)])
+
+/* Is a resize currently in progress? */
+#define RESIZE_IN_PROGRESS(hash_table)			\
+	(hash_table->old_buckets != NULL)
+
+static void delete_item(dht_hash_table *hash_table, dht_hash_table_item * item);
+static void advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next);
+static dsa_pointer next_in_bucket(dht_iterator *iterator);
+static void begin_resize(dht_hash_table *hash_table, size_t new_size);
+static void finish_resize(dht_hash_table *hash_table);
+static void do_incremental_resize(dht_hash_table *hash_table, int p);
+static void search_for_resize_work(dht_hash_table *hash_table,
+					   int start_partition);
+static inline void ensure_valid_bucket_pointers(dht_hash_table *hash_table);
+static inline dht_hash_table_item *find_in_bucket(dht_hash_table *hash_table,
+			   const void *key,
+			   dsa_pointer item_pointer);
+static void insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket);
+static dht_hash_table_item *insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket);
+static bool delete_key_from_bucket(dht_hash_table *hash_table, const void *key,
+					   dsa_pointer *bucket_head);
+static bool delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head);
+
+#define PARTITION_LOCK(hash_table, i)			\
+	(&(hash_table)->control->partitions[(i)].lock)
+
+/*
+ * Create a new hash table backed by the given dynamic shared area, with the
+ * given parameters.
+ */
+dht_hash_table *
+dht_create(dsa_area *area, const dht_parameters *params)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Allocate the control object in shared memory. */
+	control = dsa_allocate(area, sizeof(dht_hash_table_control));
+
+	/* Set up the local and shared hash table structs. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->control->handle = control;
+	hash_table->control->magic = DHT_MAGIC;
+	hash_table->control->lwlock_tranche_id = params->tranche_id;
+
+	/* Set up the array of lock partitions. */
+	{
+		int			i;
+
+		for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		{
+			LWLockInitialize(PARTITION_LOCK(hash_table, i),
+							 hash_table->control->lwlock_tranche_id);
+			hash_table->control->partitions[i].count = 0;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+
+	/*
+	 * Set up the initial array of buckets.  Our initial size is the same as
+	 * the number of partitions.
+	 */
+	hash_table->control->size_log2 = DHT_NUM_PARTITIONS_LOG2;
+	hash_table->control->buckets =
+		dsa_allocate(area, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+	hash_table->buckets = dsa_get_address(area,
+										  hash_table->control->buckets);
+	hash_table->control->old_buckets = InvalidDsaPointer;
+	hash_table->old_buckets = NULL;
+	memset(hash_table->buckets, 0, sizeof(dsa_pointer) * DHT_NUM_PARTITIONS);
+
+	return hash_table;
+}
+
+/*
+ * Attach to an existing hash table using a handle.
+ */
+dht_hash_table *
+dht_attach(dsa_area *area, const dht_parameters *params,
+		   dht_hash_table_handle handle)
+{
+	dht_hash_table *hash_table;
+	dsa_pointer control;
+
+	/* Allocate the backend-local object representing the hash table. */
+	hash_table = palloc(sizeof(dht_hash_table));
+
+	/* Find the control object in shared memory. */
+	control = handle;
+
+	/* Set up the local hash table struct. */
+	hash_table->area = area;
+	hash_table->params = *params;
+	hash_table->control = dsa_get_address(area, control);
+	hash_table->exclusively_locked = false;
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table;
+}
+
+/*
+ * Detach from a hash table.  This frees backend-local resources associated
+ * with the hash table, but the hash table will continue to exist until it is
+ * either explicitly destroyed (by a backend that is still attached to it), or
+ * the area that backs it is returned to the operating system.
+ */
+void
+dht_detach(dht_hash_table *hash_table)
+{
+	/* The hash table may have been destroyed.  Just free local memory. */
+	pfree(hash_table);
+}
+
+/*
+ * Destroy a hash table, returning all memory to the area.  The caller must be
+ * certain that no other backend will attempt to access the hash table before
+ * calling this function.  Other backend must explicitly call dht_detach to
+ * free up backend-local memory associated with the hash table.  The backend
+ * that calls dht_destroy must not call dht_detach.
+ */
+void
+dht_destroy(dht_hash_table *hash_table)
+{
+	dht_iterator iterator;
+	void	   *entry;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/* Free all the entries. */
+	dht_iterate_begin(hash_table, &iterator, true);
+	while ((entry = dht_iterate_next(&iterator)))
+		dht_iterate_delete(&iterator);
+	dht_iterate_end(&iterator);
+
+	/*
+	 * Vandalize the control block to help catch programming errors where
+	 * other backends access the memory formerly occupied by this hash table.
+	 */
+	hash_table->control->magic = 0;
+
+	/* Free the active and (if appropriate) old table, and control object. */
+	if (DsaPointerIsValid(hash_table->old_buckets))
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+	dsa_free(hash_table->area, hash_table->control->buckets);
+	dsa_free(hash_table->area, hash_table->control->handle);
+
+	pfree(hash_table);
+}
+
+/*
+ * Get a handle that can be used by other processes to attach to this hash
+ * table.
+ */
+dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	return hash_table->control->handle;
+}
+
+/*
+ * Look up an entry, given a key.  Returns a pointer to an entry if one can be
+ * found with the given key.  Returns NULL if the key is not found.  If a
+ * non-NULL value is returned, the entry is locked and must be released by
+ * calling dht_release.  If an error is raised before dht_release is called,
+ * the lock will be released automatically, but the caller must take care to
+ * ensure that the entry is not left corrupted.  The lock mode is either
+ * shared or exclusive depending on 'exclusive'.
+ *
+ * Note that the lock held is in fact an LWLock, so interrupts will be held
+ * on return from this function, and not resumed until dht_release is called.
+ * It is a very good idea for the caller to release the lock quickly.
+ */
+void *
+dht_find(dht_hash_table *hash_table, const void *key, bool exclusive)
+{
+	size_t		hash;
+	size_t		partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition = PARTITION_FOR_HASH(hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition),
+				  exclusive ? LW_EXCLUSIVE : LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (!item)
+	{
+		/* Not found. */
+		LWLockRelease(PARTITION_LOCK(hash_table, partition));
+		return NULL;
+	}
+	else
+	{
+		/* The caller will free the lock by calling dht_release. */
+		hash_table->exclusively_locked = exclusive;
+		return &item->entry;
+	}
+}
+
+/*
+ * Returns a pointer to an exclusively locked item which must be released with
+ * dht_release.  If the key is found in the hash table, 'found' is set to true
+ * and a pointer to the existing entry is returned.  If the key is not found,
+ * 'found' is set to false, and a pointer to a newly created entry is related.
+ */
+void *
+dht_find_or_insert(dht_hash_table *hash_table,
+				   const void *key,
+				   bool *found)
+{
+	size_t		hash;
+	size_t		partition_index;
+	dht_partition *partition;
+	dht_hash_table_item *item;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+	partition_index = PARTITION_FOR_HASH(hash);
+	partition = &hash_table->control->partitions[partition_index];
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+restart:
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition_index),
+				  LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/* Search the active bucket. */
+	item = find_in_bucket(hash_table, key, BUCKET_FOR_HASH(hash_table, hash));
+
+	/* If not found and a resize is in progress, search the old bucket too. */
+	if (!item && RESIZE_IN_PROGRESS(hash_table))
+		item = find_in_bucket(hash_table, key,
+							  OLD_BUCKET_FOR_HASH(hash_table, hash));
+
+	if (item)
+		*found = true;
+	else
+	{
+		*found = false;
+
+		/* Check if we are getting too full. */
+		if (partition->count > MAX_COUNT_PER_PARTITION(hash_table) &&
+			!RESIZE_IN_PROGRESS(hash_table))
+		{
+			/*
+			 * The load factor (= keys / buckets) for all buckets protected by
+			 * this partition is > 0.75.  Presumably the same applies
+			 * generally across the whole hash table (though we don't attempt
+			 * to track that directly to avoid contention on some kind of
+			 * central counter; we just assume that this partition is
+			 * representative).  This is a good time to start a new resize
+			 * operation, if there isn't one already in progress.
+			 *
+			 * Give up our existing lock first, because resizing needs to
+			 * reacquire all the locks in the right order.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+			begin_resize(hash_table, hash_table->size_log2 + 1);
+
+			goto restart;
+		}
+
+		/* Finally we can try to insert the new item. */
+		item = insert_into_bucket(hash_table, key,
+								  &BUCKET_FOR_HASH(hash_table, hash));
+		item->hash = hash;
+		/* Adjust per-lock-partition counter for load factor knowledge. */
+		++partition->count;
+	}
+
+	/* The caller must release the lock with dht_release. */
+	hash_table->exclusively_locked = true;
+	return &item->entry;
+}
+
+/*
+ * Remove an entry by key.  Returns true if the key was found and the
+ * corresponding entry was removed.
+ *
+ * To delete an entry that you already have a pointer to, see
+ * dht_delete_entry.
+ */
+bool
+dht_delete_key(dht_hash_table *hash_table, const void *key)
+{
+	size_t		hash;
+	size_t		partition;
+	bool		found;
+
+	hash = hash_table->params.hash_function(key, hash_table->params.key_size);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(!hash_table->exclusively_locked);
+
+	partition = PARTITION_FOR_HASH(hash);
+
+	LWLockAcquire(PARTITION_LOCK(hash_table, partition), LW_EXCLUSIVE);
+	ensure_valid_bucket_pointers(hash_table);
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_key_from_bucket(hash_table, key,
+							   &BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].count;
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_key_from_bucket(hash_table, key,
+									&OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		Assert(hash_table->control->partitions[partition].old_count > 0);
+		found = true;
+		--hash_table->control->partitions[partition].old_count;
+	}
+	else
+		found = false;
+
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+
+	return found;
+}
+
+/*
+ * Remove an entry.  The entry must already be exclusively locked, and must
+ * have been obtained by dht_find or dht_find_or_insert.  Note that this
+ * function releases the lock just like dht_release.
+ *
+ * To delete an entry by key, see dht_delete_key.  To delete an entry while
+ * iterating, see dht_iterator_delete.
+ */
+void
+dht_delete_entry(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	size_t		partition = PARTITION_FOR_HASH(item->hash);
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+	Assert(hash_table->exclusively_locked);
+
+	delete_item(hash_table, item);
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition));
+}
+
+/*
+ * Unlock an entry which was locked by dht_find or dht_find_or_insert.
+ */
+void
+dht_release(dht_hash_table *hash_table, void *entry)
+{
+	dht_hash_table_item *item = ITEM_FROM_ENTRY(entry);
+	size_t		partition_index = PARTITION_FOR_HASH(item->hash);
+	bool		deferred_resize_work = false;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	/*
+	 * If there is a resize in progress and we took an exclusive lock, try to
+	 * migrate a key from the old bucket array to the new bucket array.
+	 * Ideally, we'll find some work to do in the partition on which we
+	 * already hold a lock, but if not, we'll make a note to search other lock
+	 * partitions after releasing the lock we currently hold.
+	 */
+	if (RESIZE_IN_PROGRESS(hash_table) && hash_table->exclusively_locked)
+	{
+		dht_partition *partition;
+
+		partition = &hash_table->control->partitions[partition_index];
+
+		if (partition->old_count > 0)
+		{
+			/* Do some resizing work in this partition. */
+			do_incremental_resize(hash_table, partition_index);
+		}
+		else
+		{
+			/* Nothing left to do in this partition.  Look elsewhere. */
+			deferred_resize_work = true;
+		}
+	}
+
+	hash_table->exclusively_locked = false;
+	LWLockRelease(PARTITION_LOCK(hash_table, partition_index));
+
+	/*
+	 * If we need to contribute some work to the ongoing resize effort, and we
+	 * didn't find something to migrate in our own partition, go search for
+	 * such work in other partitions.  We'll start with the one after ours.
+	 */
+	if (deferred_resize_work)
+		search_for_resize_work(hash_table,
+							   (partition_index + 1) % DHT_NUM_PARTITIONS);
+}
+
+/*
+ * Begin iterating through the whole hash table.  The caller must supply a
+ * dht_iterator object, which can then be used to call dht_iterate_next to get
+ * values until the end is reached.
+ */
+void
+dht_iterate_begin(dht_hash_table *hash_table,
+				  dht_iterator *iterator,
+				  bool exclusive)
+{
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	iterator->hash_table = hash_table;
+	iterator->exclusive = exclusive;
+	iterator->partition = 0;
+	iterator->bucket = 0;
+	iterator->item = NULL;
+	iterator->last_item_pointer = InvalidDsaPointer;
+	iterator->locked = false;
+
+	/* Snapshot the size (arbitrary lock to prevent size changing). */
+	LWLockAcquire(PARTITION_LOCK(hash_table, 0), LW_SHARED);
+	ensure_valid_bucket_pointers(hash_table);
+	iterator->table_size_log2 = hash_table->size_log2;
+	LWLockRelease(PARTITION_LOCK(hash_table, 0));
+}
+
+/*
+ * Delete the entry that was most recently returned by dht_iterate_next.  The
+ * iterator must have been initialized in exclusive lock mode.
+ */
+void
+dht_iterate_delete(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	Assert(iterator->exclusive);
+	Assert(iterator->item != NULL);
+	delete_item(iterator->hash_table, iterator->item);
+	iterator->item = NULL;
+}
+
+/*
+ * Move to the next item in the hash table.  Returns a pointer to an entry, or
+ * NULL if the end of the hash table has been reached.  The item is locked in
+ * exclusive or shared mode depending on the argument given to
+ * dht_iterate_begin.  The caller can optionally release the lock by calling
+ * dht_iterate_release, and then call dht_iterate_next again to move to the
+ * next entry.  If the iteration is in exclusive mode, client code can also
+ * call dht_iterate_delete.  When the end of the hash table is reached, or at
+ * any time, the client may call dht_iterate_end to abandon iteration.
+ */
+void *
+dht_iterate_next(dht_iterator *iterator)
+{
+	dsa_pointer item_pointer;
+
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+
+	while (iterator->partition < DHT_NUM_PARTITIONS)
+	{
+		if (!iterator->locked)
+		{
+			LWLockAcquire(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition),
+						  iterator->exclusive ? LW_EXCLUSIVE : LW_SHARED);
+			iterator->locked = true;
+		}
+
+		item_pointer = next_in_bucket(iterator);
+		if (DsaPointerIsValid(item_pointer))
+		{
+			/* Remember this item, so that we can step over it next time. */
+			iterator->last_item_pointer = item_pointer;
+			iterator->item = dsa_get_address(iterator->hash_table->area,
+											 item_pointer);
+			return &(iterator->item->entry);
+		}
+
+		/* We have reached the end of the bucket. */
+		iterator->last_item_pointer = InvalidDsaPointer;
+		iterator->bucket += 1;
+
+		/*
+		 * If the last bucket was split while we were iterating, then we have
+		 * been doing some expensive bucket merging to get each item.  Now
+		 * that we've completed that whole pre-split bucket (by merging all
+		 * its descendants), we can update the table size of our iterator and
+		 * start iterating cheaply again.
+		 */
+		iterator->bucket <<=
+			iterator->hash_table->size_log2 - iterator->table_size_log2;
+		iterator->table_size_log2 = iterator->hash_table->size_log2;
+		/* Have we gone past the last bucket in this partition? */
+		if (iterator->bucket >=
+			BUCKET_INDEX_FOR_PARTITION(iterator->partition + 1,
+									   iterator->table_size_log2))
+		{
+			/* No more buckets.  Try next partition. */
+			LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition));
+			iterator->locked = false;
+			++iterator->partition;
+			iterator->bucket =
+				BUCKET_INDEX_FOR_PARTITION(iterator->partition,
+										   iterator->table_size_log2);
+		}
+	}
+	iterator->item = NULL;
+	return NULL;
+}
+
+/*
+ * Release the most recently obtained lock.  This can optionally be called in
+ * between calls to dht_iterator_next to allow other processes to access the
+ * same partition of the hash table.
+ */
+void
+dht_iterate_release(dht_iterator *iterator)
+{
+	Assert(iterator->locked);
+	LWLockRelease(PARTITION_LOCK(iterator->hash_table, iterator->partition));
+	iterator->locked = false;
+}
+
+/*
+ * Terminate iteration.  This must be called after iteration completes,
+ * whether or not the end was reached.  The iterator object may then be reused
+ * for another iteration.
+ */
+void
+dht_iterate_end(dht_iterator *iterator)
+{
+	Assert(iterator->hash_table->control->magic == DHT_MAGIC);
+	if (iterator->locked)
+		LWLockRelease(PARTITION_LOCK(iterator->hash_table,
+									 iterator->partition));
+}
+
+/*
+ * Print out debugging information about the internal state of the hash table.
+ */
+void
+dht_dump(dht_hash_table *hash_table)
+{
+	size_t		i;
+	size_t		j;
+
+	Assert(hash_table->control->magic == DHT_MAGIC);
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_SHARED);
+
+	ensure_valid_bucket_pointers(hash_table);
+
+	fprintf(stderr,
+			"hash table size = %zu\n", (size_t) 1 << hash_table->size_log2);
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+		size_t		begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2);
+		size_t		end = BUCKET_INDEX_FOR_PARTITION(i + 1, hash_table->size_log2);
+
+		fprintf(stderr, "  partition %zu\n", i);
+		fprintf(stderr,
+				"    active buckets (key count = %zu)\n", partition->count);
+
+		for (j = begin; j < end; ++j)
+		{
+			size_t		count = 0;
+			dsa_pointer bucket = hash_table->buckets[j];
+
+			while (DsaPointerIsValid(bucket))
+			{
+				dht_hash_table_item *item;
+
+				item = dsa_get_address(hash_table->area, bucket);
+
+				bucket = item->next;
+				++count;
+			}
+			fprintf(stderr, "      bucket %zu (key count = %zu)\n", j, count);
+		}
+		if (RESIZE_IN_PROGRESS(hash_table))
+		{
+			size_t		begin;
+			size_t		end;
+
+			begin = BUCKET_INDEX_FOR_PARTITION(i, hash_table->size_log2 - 1);
+			end = BUCKET_INDEX_FOR_PARTITION(i + 1,
+											 hash_table->size_log2 - 1);
+
+			fprintf(stderr, "    old buckets (key count = %zu)\n",
+					partition->old_count);
+
+			for (j = begin; j < end; ++j)
+			{
+				size_t		count = 0;
+				dsa_pointer bucket = hash_table->old_buckets[j];
+
+				while (DsaPointerIsValid(bucket))
+				{
+					dht_hash_table_item *item;
+
+					item = dsa_get_address(hash_table->area, bucket);
+
+					bucket = item->next;
+					++count;
+				}
+				fprintf(stderr,
+						"      bucket %zu (key count = %zu)\n", j, count);
+			}
+		}
+	}
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Delete a locked item to which we have a pointer.
+ */
+static void
+delete_item(dht_hash_table *hash_table, dht_hash_table_item * item)
+{
+	size_t		hash = item->hash;
+	size_t		partition = PARTITION_FOR_HASH(hash);
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, partition)));
+
+	/*
+	 * Try the active bucket first, and then the outgoing bucket if we are
+	 * resizing.  It can't be in both.
+	 */
+	if (delete_item_from_bucket(hash_table, item,
+								&BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].count;
+		Assert(hash_table->control->partitions[partition].count >= 0);
+	}
+	else if (RESIZE_IN_PROGRESS(hash_table) &&
+			 delete_item_from_bucket(hash_table, item,
+									 &OLD_BUCKET_FOR_HASH(hash_table, hash)))
+	{
+		--hash_table->control->partitions[partition].old_count;
+		Assert(hash_table->control->partitions[partition].old_count >= 0);
+	}
+	else
+	{
+		Assert(false);
+	}
+}
+
+/*
+ * Find the item that comes after iterator->last_item_pointer but before
+ * *next, and write it into *next.
+ *
+ * Calling this on several buckets allows us to 'merge' the items from a
+ * bucket with its ancestors and descendents during resize operations, and
+ * iterate through the items in all of them in a stable order, despite items
+ * being able to move while we iterate.
+ *
+ * This works because buckets are sorted by descending item pointer.
+ */
+static void
+advance_iterator(dht_iterator *iterator, dsa_pointer bucket_head,
+				 dsa_pointer *next)
+{
+	dht_hash_table_item *item;
+
+	while (DsaPointerIsValid(bucket_head))
+	{
+		item = dsa_get_address(iterator->hash_table->area,
+							   bucket_head);
+		if ((!DsaPointerIsValid(iterator->last_item_pointer) ||
+			 bucket_head < iterator->last_item_pointer) &&
+			bucket_head > *next &&
+			BUCKET_INDEX_FOR_HASH_AND_SIZE(item->hash,
+										   iterator->table_size_log2) ==
+			iterator->bucket)
+		{
+			*next = bucket_head;
+			return;
+		}
+		bucket_head = item->next;
+	}
+}
+
+/*
+ * Find the next item in the bucket we are visiting.  If a resize is in
+ * progress or the bucket has been split since we started iterating over this
+ * bucket, we may need to combine items from multiple buckets to synthesize
+ * the original bucket.
+ */
+static dsa_pointer
+next_in_bucket(dht_iterator *iterator)
+{
+	size_t		bucket;
+	size_t		begin;
+	size_t		end;
+	dsa_pointer next = InvalidDsaPointer;
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(iterator->hash_table,
+										 iterator->partition)));
+
+	/* Handle old buckets if a resize is in progress. */
+	if (RESIZE_IN_PROGRESS(iterator->hash_table))
+	{
+		if (iterator->table_size_log2 == iterator->hash_table->size_log2)
+		{
+			/*
+			 * The table hasn't grown since we started iterating, but there
+			 * was already a resize in progress so there may be items in an
+			 * old bucket that belong in the bucket we're iterating over which
+			 * haven't been migrated yet.
+			 */
+			advance_iterator(iterator,
+					 iterator->hash_table->old_buckets[iterator->bucket / 2],
+							 &next);
+		}
+		else
+		{
+			/*
+			 * The bucket has been split one or more times, and a resize is
+			 * still in progress so there may be items still in an old bucket.
+			 * If N splits have occurred, there will be 2^N current buckets
+			 * that correspond to the one original bucket.
+			 */
+			Assert(iterator->hash_table->size_log2 > iterator->table_size_log2);
+			begin = iterator->bucket <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			end = (iterator->bucket + 1) <<
+				(iterator->hash_table->size_log2 - iterator->table_size_log2 - 1);
+			for (bucket = begin; bucket < end; bucket += 1)
+				advance_iterator(iterator,
+								 iterator->hash_table->old_buckets[bucket],
+								 &next);
+		}
+	}
+
+	/*
+	 * Scan the bucket.  This is a loop because it may have split any number
+	 * of times since we started iterating through it, but in the common case
+	 * we just loop once.
+	 */
+	begin = iterator->bucket <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	end = (iterator->bucket + 1) <<
+		(iterator->hash_table->size_log2 - iterator->table_size_log2);
+	for (bucket = begin; bucket < end; ++bucket)
+		advance_iterator(iterator,
+						 iterator->hash_table->buckets[iterator->bucket],
+						 &next);
+
+	return next;
+}
+
+/*
+ * Grow the hash table if necessary to the requested number of buckets.  The
+ * requested size must be double some previously observed size.  Returns true
+ * if the table was successfully expanded or found to be big enough already
+ * (because another backend expanded it).  Returns false for out-of-memory.
+ * The process of transferring items from the old table to the new one will be
+ * carried out with a small tax on all future operations until the work is
+ * done.
+ */
+static void
+begin_resize(dht_hash_table *hash_table, size_t new_size_log2)
+{
+	int			i;
+	dsa_pointer new_buckets;
+
+	/*
+	 * Acquire the locks for all lock partitions.  This is expensive, but we
+	 * shouldn't have to do it many times.
+	 */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0 && hash_table->control->size_log2 >= new_size_log2)
+		{
+			/*
+			 * Another backend has already increased the size; we can avoid
+			 * obtaining all the locks and return early.
+			 */
+			LWLockRelease(PARTITION_LOCK(hash_table, 0));
+			return;
+		}
+	}
+
+	Assert(new_size_log2 == hash_table->control->size_log2 + 1);
+	Assert(!RESIZE_IN_PROGRESS(hash_table));
+
+	/* Allocate the space for the new table. */
+	new_buckets = dsa_allocate(hash_table->area,
+							   sizeof(dsa_pointer) * (1 << new_size_log2));
+	hash_table->control->old_buckets = hash_table->control->buckets;
+	hash_table->control->buckets = new_buckets;
+	hash_table->control->size_log2 = new_size_log2;
+	hash_table->buckets = dsa_get_address(hash_table->area,
+										  hash_table->control->buckets);
+	hash_table->old_buckets =
+		dsa_get_address(hash_table->area,
+						hash_table->control->old_buckets);
+	hash_table->size_log2 = new_size_log2;
+	/* InvalidDsaPointer in every bucket */
+	memset(hash_table->buckets, 0,
+		   sizeof(dsa_pointer) * (1 << new_size_log2));
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		partition->old_count = partition->count;
+		partition->count = 0;
+		partition->next_bucket_to_split =
+			BUCKET_INDEX_FOR_PARTITION(i, new_size_log2 - 1);
+	}
+
+	/* Release all the locks. */
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+		LWLockRelease(PARTITION_LOCK(hash_table, i));
+}
+
+/*
+ * Check if the resize operation is finished.  If it is, then the old table
+ * can be returned to the shared memory area.
+ */
+static void
+finish_resize(dht_hash_table *hash_table)
+{
+	int			i,
+				j;
+	bool		finished = true;
+
+	for (i = 0; i < DHT_NUM_PARTITIONS; ++i)
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+		if (i == 0)
+		{
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* Another backend has already finished the resize. */
+				LWLockRelease(PARTITION_LOCK(hash_table, i));
+				return;
+			}
+		}
+
+		/*
+		 * We can only actually finish the resize when every partition has no
+		 * more keys in old buckets.  This might not be true yet, because
+		 * search_for_resize_work calls us when it thinks there might be no
+		 * more resizing work to do, but it could be wrong because it judges
+		 * that with an unsynchronized view of memory.  Now that we can
+		 * examine the synchronized counters, we can check if the resize
+		 * really is complete.
+		 */
+		if (partition->old_count > 0)
+		{
+			finished = false;
+			break;
+		}
+	}
+
+	if (finished)
+	{
+		dsa_free(hash_table->area, hash_table->control->old_buckets);
+		hash_table->control->old_buckets = InvalidDsaPointer;
+	}
+
+	/*
+	 * Only release as many locks as we acquired.  When finished is true, this
+	 * will be all of them, but if we exited the lock-acquire loop early it
+	 * might be fewer.
+	 */
+	for (j = 0; j < i; ++j)
+		LWLockRelease(PARTITION_LOCK(hash_table, j));
+}
+
+/*
+ * Migrate one item from partition 'p' from an old bucket to the appropriate
+ * new bucket.  The partition lock must already be held and there must be at
+ * least one item to be migrated.
+ */
+static void
+do_incremental_resize(dht_hash_table *hash_table, int p)
+{
+	size_t		old_size_log2 = hash_table->control->size_log2 - 1;
+	dht_partition *partition = &hash_table->control->partitions[p];
+
+	Assert(LWLockHeldByMe(PARTITION_LOCK(hash_table, p)));
+	Assert(partition->old_count > 0);
+
+	/*
+	 * Walk through all the buckets that are covered by this partition until
+	 * we find one that is not empty, and migrate one item from it.
+	 */
+	for (;;)
+	{
+		dsa_pointer item_pointer;
+
+		item_pointer =
+			hash_table->old_buckets[partition->next_bucket_to_split];
+
+		if (DsaPointerIsValid(item_pointer))
+		{
+			dht_hash_table_item *item;
+
+			item = dsa_get_address(hash_table->area, item_pointer);
+
+			/* Remove it from the old bucket. */
+			hash_table->old_buckets[partition->next_bucket_to_split] =
+				item->next;
+			--partition->old_count;
+
+			/* Add it to the active bucket. */
+			insert_item_into_bucket(hash_table,
+									item_pointer,
+									item,
+									&BUCKET_FOR_HASH(hash_table, item->hash));
+			++partition->count;
+
+			return;
+		}
+		else
+		{
+			/* Move to the next bucket protected by this partition. */
+			partition->next_bucket_to_split += 1;
+
+			/*
+			 * We shouldn't be able to reach past the end of the partition
+			 * without finding an item.  That would imply that the partition's
+			 * old_count was corrupted.
+			 *
+			 * This is an elog() because we want to catch this even in builds
+			 * that are not Assert-enabled.
+			 */
+			if (partition->next_bucket_to_split >=
+				BUCKET_INDEX_FOR_PARTITION(p + 1, old_size_log2))
+				elog(ERROR, "dht corrupted");
+		}
+	}
+}
+
+/*
+ * Try to find a partition that has outstanding resizing work and do some of
+ * that work, possibly completing the resize operation.  The caller must hold
+ * no locks.
+ *
+ * Consider 'start_partition' first.  dht_release tries to spread this around
+ * to try to reduce contention.
+ */
+static void
+search_for_resize_work(dht_hash_table *hash_table, int start_partition)
+{
+	int			i = start_partition;
+
+	Assert(start_partition >= 0);
+	Assert(start_partition < DHT_NUM_PARTITIONS);
+
+	/*
+	 * Using a non-synchronized view of memory (no locks), see if we can find
+	 * any partition that looks like it still has keys to be migrated.
+	 */
+	do
+	{
+		dht_partition *partition = &hash_table->control->partitions[i];
+
+		if (partition->old_count > 0)
+		{
+			bool		done = false;
+
+			/* Now check again with a lock. */
+			LWLockAcquire(PARTITION_LOCK(hash_table, i), LW_EXCLUSIVE);
+			ensure_valid_bucket_pointers(hash_table);
+			if (!RESIZE_IN_PROGRESS(hash_table))
+			{
+				/* The resize finished underneath us. */
+				done = true;
+			}
+			else if (partition->old_count > 0)
+			{
+				/* There is in fact work to be done in this partition. */
+				do_incremental_resize(hash_table, i);
+				done = true;
+			}
+			LWLockRelease(PARTITION_LOCK(hash_table, i));
+
+			if (done)
+				return;
+		}
+		i = (i + 1) % DHT_NUM_PARTITIONS;
+	} while (i != start_partition);
+
+	/*
+	 * We didn't find any partitions that seem to have remaining migration
+	 * work, though we can't be 100% sure due to unsynchronized reads of
+	 * memory.  We will try to finish the resize, which may or may not
+	 * actually succeed, depending on what it finds the true state to be.
+	 */
+	finish_resize(hash_table);
+}
+
+/*
+ * Make sure that our backend-local bucket pointers are up to date.  The
+ * caller must have locked one lock partition, which prevents begin_resize()
+ * from running concurrently.
+ */
+static inline void
+ensure_valid_bucket_pointers(dht_hash_table *hash_table)
+{
+	if (hash_table->size_log2 != hash_table->control->size_log2)
+	{
+		/*
+		 * A resize has started since we last looked, so our pointers are out
+		 * of date.
+		 */
+		hash_table->buckets = dsa_get_address(hash_table->area,
+											  hash_table->control->buckets);
+		if (DsaPointerIsValid(hash_table->control->old_buckets))
+			hash_table->old_buckets =
+				dsa_get_address(hash_table->area,
+								hash_table->control->old_buckets);
+		else
+			hash_table->old_buckets = NULL;
+		hash_table->size_log2 = hash_table->control->size_log2;
+	}
+	if (hash_table->old_buckets &&
+		!DsaPointerIsValid(hash_table->control->old_buckets))
+	{
+		/* A resize has finished since we last looked. */
+		hash_table->old_buckets = NULL;
+	}
+}
+
+/*
+ * Scan a locked bucket for a match, using the provided compare-function.
+ */
+static inline dht_hash_table_item *
+find_in_bucket(dht_hash_table *hash_table, const void *key,
+			   dsa_pointer item_pointer)
+{
+	while (DsaPointerIsValid(item_pointer))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, item_pointer);
+		if (hash_table->params.compare_function(key, &item->entry,
+										   hash_table->params.key_size) == 0)
+			return item;
+		item_pointer = item->next;
+	}
+	return NULL;
+}
+
+/*
+ * Insert an already-allocated item into a bucket.
+ */
+static void
+insert_item_into_bucket(dht_hash_table *hash_table,
+						dsa_pointer item_pointer,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket)
+{
+	Assert(item == dsa_get_address(hash_table->area, item_pointer));
+
+	/*
+	 * Find the insertion point for this item in the bucket.  Buckets are
+	 * sorted by descending dsa_pointer, as part of the protocol for ensuring
+	 * that iterators don't visit items twice when buckets are split during a
+	 * scan.  (We don't actually expect them to have more than 1 item unless
+	 * the hash function is of low quality.)
+	 */
+	while (*bucket > item_pointer)
+	{
+		dht_hash_table_item *this_item;
+
+		this_item = dsa_get_address(hash_table->area, *bucket);
+
+		bucket = &this_item->next;
+	}
+
+	item->next = *bucket;
+	*bucket = item_pointer;
+}
+
+/*
+ * Allocate space for an entry with the given key and insert it into the
+ * provided bucket.
+ */
+static dht_hash_table_item *
+insert_into_bucket(dht_hash_table *hash_table,
+				   const void *key,
+				   dsa_pointer *bucket)
+{
+	dsa_pointer item_pointer;
+	dht_hash_table_item *item;
+
+	item_pointer = dsa_allocate(hash_table->area,
+								hash_table->params.entry_size +
+								offsetof(dht_hash_table_item, entry));
+	item = dsa_get_address(hash_table->area, item_pointer);
+	memcpy(&(item->entry), key, hash_table->params.key_size);
+	insert_item_into_bucket(hash_table, item_pointer, item, bucket);
+	return item;
+}
+
+/*
+ * Search a bucket for a matching key and delete it.
+ */
+static bool
+delete_key_from_bucket(dht_hash_table *hash_table,
+					   const void *key,
+					   dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *item;
+
+		item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (hash_table->params.compare_function(key, &item->entry,
+												hash_table->params.key_size)
+			== 0)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &item->next;
+	}
+	return false;
+}
+
+/*
+ * Delete the specified item from the bucket.
+ */
+static bool
+delete_item_from_bucket(dht_hash_table *hash_table,
+						dht_hash_table_item * item,
+						dsa_pointer *bucket_head)
+{
+	while (DsaPointerIsValid(*bucket_head))
+	{
+		dht_hash_table_item *bucket_item;
+
+		bucket_item = dsa_get_address(hash_table->area, *bucket_head);
+
+		if (bucket_item == item)
+		{
+			dsa_pointer next;
+
+			next = item->next;
+			dsa_free(hash_table->area, *bucket_head);
+			*bucket_head = next;
+			return true;
+		}
+		bucket_head = &bucket_item->next;
+	}
+	return false;
+}
diff --git a/src/include/lib/dht.h b/src/include/lib/dht.h
new file mode 100644
index 00000000000..fea8c5f7c71
--- /dev/null
+++ b/src/include/lib/dht.h
@@ -0,0 +1,114 @@
+/*-------------------------------------------------------------------------
+ *
+ * dht.h
+ *	  Concurrent hash tables backed by dynamic shared memory areas.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/include/lib/dht.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DHT_H
+#define DHT_H
+
+#include "utils/dsa.h"
+
+/* The opaque type representing a hash table. */
+struct dht_hash_table;
+typedef struct dht_hash_table dht_hash_table;
+
+/* The opaque type used for tracking iterator state. */
+struct dht_iterator;
+typedef struct dht_iterator dht_iterator;
+
+/* A handle for a dht_hash_table which can be shared with other processes. */
+typedef dsa_pointer dht_hash_table_handle;
+
+/* The type for hash values. */
+typedef uint32 dht_hash;
+
+/*
+ * The function type for computing hash values for keys.  Compatible with
+ * HashValueFunc in hsearch.h and hash functions like tag_hash.
+ */
+typedef dht_hash (*dht_hash_function)(const void *v, Size size);
+
+/*
+ * The function used for comparing keys.  Compatible with HashCompareFunction
+ * in hsearch.h and memcmp.
+ */
+typedef int (*dht_compare_function)(const void *a, const void *b, Size size);
+
+/*
+ * The set of parameters needed to create or attach to a hash table.  The
+ * members tranche_id and tranche_name do not need to be initialized when
+ * attaching to an existing hash table.
+ */
+typedef struct
+{
+	Size key_size;				/* Size of the key (initial bytes of entry) */
+	Size entry_size;			/* Total size of entry */
+	dht_compare_function compare_function;	/* Compare function */
+	dht_hash_function hash_function;	/* Hash function */
+	int tranche_id;				/* The tranche ID to use for locks. */
+} dht_parameters;
+
+/* Forward declaration of private types for use only by dht.c. */
+struct dht_hash_table_bucket;
+struct dht_hash_table_item;
+typedef struct dht_hash_table_item dht_hash_table_item;
+typedef struct dht_hash_table_bucket dht_hash_table_bucket;
+
+/*
+ * The state used to track a walk over all entries in a hash table.  The
+ * members of this struct are only for use by code in dht.c, but it is
+ * included in the header because it's useful to be able to create objects of
+ * this type on the stack to pass into the dht_iterator_XXX functions.
+ */
+struct dht_iterator
+{
+	dht_hash_table *hash_table;	/* The hash table we are iterating over. */
+	bool exclusive;				/* Whether to lock buckets exclusively. */
+	Size partition;				/* The index of the next partition to visit. */
+	Size bucket;				/* The index of the next bucket to visit. */
+	dht_hash_table_item *item;  /* The most recently returned item. */
+	dsa_pointer last_item_pointer; /* The last item visited. */
+	Size table_size_log2; 	/* The table size when we started iterating. */
+	bool locked;			/* Whether the current partition is locked. */
+};
+
+/* Creating, sharing and destroying from hash tables. */
+extern dht_hash_table *dht_create(dsa_area *area,
+								  const dht_parameters *params);
+extern dht_hash_table *dht_attach(dsa_area *area,
+								  const dht_parameters *params,
+								  dht_hash_table_handle handle);
+extern void dht_detach(dht_hash_table *hash_table);
+extern dht_hash_table_handle
+dht_get_hash_table_handle(dht_hash_table *hash_table);
+extern void dht_destroy(dht_hash_table *hash_table);
+
+/* Iterating over the whole hash table. */
+extern void dht_iterate_begin(dht_hash_table *hash_table,
+							  dht_iterator *iterator, bool exclusive);
+extern void *dht_iterate_next(dht_iterator *iterator);
+extern void dht_iterate_delete(dht_iterator *iterator);
+extern void dht_iterate_release(dht_iterator *iterator);
+extern void dht_iterate_end(dht_iterator *iterator);
+
+/* Finding, creating, deleting entries. */
+extern void *dht_find(dht_hash_table *hash_table,
+					  const void *key, bool exclusive);
+extern void *dht_find_or_insert(dht_hash_table *hash_table,
+								const void *key, bool *found);
+extern bool dht_delete_key(dht_hash_table *hash_table, const void *key);
+extern void dht_delete_entry(dht_hash_table *hash_table, void *entry);
+extern void dht_release(dht_hash_table *hash_table, void *entry);
+
+/* Debugging support. */
+extern void dht_dump(dht_hash_table *hash_table);
+
+#endif   /* DHT_H */
