Index: include/pvfs2-sysint.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/include/pvfs2-sysint.h,v
retrieving revision 1.75
diff -u -a -p -r1.75 pvfs2-sysint.h
--- include/pvfs2-sysint.h	20 Sep 2007 15:09:21 -0000	1.75
+++ include/pvfs2-sysint.h	6 Feb 2008 00:38:25 -0000
@@ -172,7 +172,7 @@ typedef struct PVFS_sysresp_readdirplus_
 
 struct PVFS_sysresp_statfs_s
 {
-    PVFS_statfs statfs_buf;
+    struct PVFS_statfs statfs_buf;
     int32_t server_count; /* int32_t for portable, fixed size structure */
 };
 typedef struct PVFS_sysresp_statfs_s PVFS_sysresp_statfs;
Index: include/pvfs2-types.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/include/pvfs2-types.h,v
retrieving revision 1.147
diff -u -a -p -r1.147 pvfs2-types.h
--- include/pvfs2-types.h	19 Dec 2007 05:39:09 -0000	1.147
+++ include/pvfs2-types.h	6 Feb 2008 00:38:25 -0000
@@ -363,7 +363,7 @@ typedef struct
 #define PVFS_XATTR_REPLACE 0x2
 
 /** statfs and misc. server statistic information. */
-typedef struct
+struct PVFS_statfs
 {
     PVFS_fs_id fs_id;
     PVFS_size bytes_available;
@@ -374,10 +374,9 @@ typedef struct
     uint64_t load_5;
     uint64_t load_15;
     uint64_t uptime_seconds;
-    uint64_t handles_available_count;
     uint64_t handles_total_count;
-} PVFS_statfs;
-endecode_fields_12(
+};
+endecode_fields_11_struct(
     PVFS_statfs,
     skip4,,
     PVFS_fs_id, fs_id,
@@ -389,7 +388,6 @@ endecode_fields_12(
     uint64_t, load_5,
     uint64_t, load_15,
     uint64_t, uptime_seconds,
-    uint64_t, handles_available_count,
     uint64_t, handles_total_count);
 
 /** object reference (uniquely refers to a single file, directory, or
Index: src/apps/admin/pvfs2-statfs.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/apps/admin/pvfs2-statfs.c,v
retrieving revision 1.31
diff -u -a -p -r1.31 pvfs2-statfs.c
--- src/apps/admin/pvfs2-statfs.c	29 May 2006 16:16:36 -0000	1.31
+++ src/apps/admin/pvfs2-statfs.c	6 Feb 2008 00:38:25 -0000
@@ -96,8 +96,6 @@ int main(int argc, char **argv)
     printf("\tfs_id: %d\n", (int)resp_statfs.statfs_buf.fs_id);
     printf("\ttotal number of servers (meta and I/O): %d\n", 
         resp_statfs.server_count);
-    printf("\thandles available (meta and I/O):       %llu\n",
-           llu(resp_statfs.statfs_buf.handles_available_count));
     printf("\thandles total (meta and I/O):           %llu\n",
            llu(resp_statfs.statfs_buf.handles_total_count));
 
Index: src/client/sysint/mgmt-statfs-list.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/mgmt-statfs-list.sm,v
retrieving revision 1.43
diff -u -a -p -r1.43 mgmt-statfs-list.sm
--- src/client/sysint/mgmt-statfs-list.sm	30 Aug 2007 00:13:41 -0000	1.43
+++ src/client/sysint/mgmt-statfs-list.sm	6 Feb 2008 00:38:25 -0000
@@ -262,7 +262,7 @@ static int statfs_list_comp_fn(void *v_p
     {
 	struct PVFS_mgmt_server_stat *sm_stat =
 	    &sm_p->u.statfs_list.stat_array[i];
-	PVFS_statfs *resp_stat = &resp_p->u.statfs.stat;
+	struct PVFS_statfs *resp_stat = &resp_p->u.statfs.stat;
 
 	sm_stat->fs_id = resp_stat->fs_id;
 	sm_stat->bytes_available = resp_stat->bytes_available;
@@ -273,8 +273,6 @@ static int statfs_list_comp_fn(void *v_p
 	sm_stat->load_5 = resp_stat->load_5;
 	sm_stat->load_15 = resp_stat->load_15;
 	sm_stat->uptime_seconds = resp_stat->uptime_seconds;
-	sm_stat->handles_available_count =
-            resp_stat->handles_available_count;
 	sm_stat->handles_total_count =
             resp_stat->handles_total_count;
 
@@ -283,8 +281,6 @@ static int statfs_list_comp_fn(void *v_p
             sm_p->msgarray[i].svr_addr, &sm_stat->server_type);
 	assert(sm_stat->bmi_address);
 
-	assert(sm_stat->handles_total_count >=
-	       sm_stat->handles_available_count);
     }
  
     /* if this is the last response, check all of the status values
Index: src/client/sysint/sys-statfs.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/client/sysint/sys-statfs.sm,v
retrieving revision 1.7
diff -u -a -p -r1.7 sys-statfs.sm
--- src/client/sysint/sys-statfs.sm	30 Aug 2007 00:13:42 -0000	1.7
+++ src/client/sysint/sys-statfs.sm	6 Feb 2008 00:38:25 -0000
@@ -197,7 +197,6 @@ static PINT_sm_action sys_statfs_cleanup
     sm_p->u.statfs_list.resp->statfs_buf.fs_id = sm_p->u.statfs_list.fs_id;
     sm_p->u.statfs_list.resp->statfs_buf.bytes_available = 0;
     sm_p->u.statfs_list.resp->statfs_buf.bytes_total = 0;
-    sm_p->u.statfs_list.resp->statfs_buf.handles_available_count = 0;
     sm_p->u.statfs_list.resp->statfs_buf.handles_total_count = 0;
     for(i=0; i<sm_p->u.statfs_list.count; i++)
     {
@@ -215,8 +214,6 @@ static PINT_sm_action sys_statfs_cleanup
 		min_bytes_total = sm_p->u.statfs_list.stat_array[i].bytes_total;
 	    }
 	}
-	sm_p->u.statfs_list.resp->statfs_buf.handles_available_count 
-	    += sm_p->u.statfs_list.stat_array[i].handles_available_count;
 	sm_p->u.statfs_list.resp->statfs_buf.handles_total_count 
 	    += sm_p->u.statfs_list.stat_array[i].handles_total_count;
     }
Index: src/common/quicklist/quicklist.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/common/quicklist/quicklist.h,v
retrieving revision 1.5
diff -u -a -p -r1.5 quicklist.h
--- src/common/quicklist/quicklist.h	15 Sep 2006 16:11:00 -0000	1.5
+++ src/common/quicklist/quicklist.h	6 Feb 2008 00:38:25 -0000
@@ -190,4 +190,11 @@ static __inline__ void qlist_splice(stru
 	     &pos->member != (head); 					\
 	     pos = n, n = qlist_entry(n->member.next, typeof(*n), member))
 
+#define qlist_for_each_reverse(pos, head) \
+	for (pos = (head)->prev; pos != (head); pos = pos->prev)
+
+#define qlist_for_each_reverse_safe(pos, scratch, head) \
+    for(pos = (head)->prev, scratch = pos->prev; pos != (head); \
+	pos = scratch, scratch = pos->prev)
+
 #endif /* QUICKLIST_H */
Index: src/io/trove/trove-types.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-types.h,v
retrieving revision 1.33
diff -u -a -p -r1.33 trove-types.h
--- src/io/trove/trove-types.h	18 Oct 2006 16:01:11 -0000	1.33
+++ src/io/trove/trove-types.h	6 Feb 2008 00:38:25 -0000
@@ -44,7 +44,7 @@ typedef PVFS_ds_attributes         TROVE
 typedef PVFS_ds_storedattr         TROVE_ds_storedattr_s;
 typedef PVFS_error                 TROVE_ds_state;
 typedef PVFS_context_id            TROVE_context_id;
-typedef PVFS_statfs		   TROVE_statfs;
+typedef struct PVFS_statfs         TROVE_statfs;
 typedef PVFS_coll_getinfo_options  TROVE_coll_getinfo_options;
 typedef PVFS_object_ref            TROVE_object_ref;
 
Index: src/io/trove/trove-dbpf/dbpf-dspace.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-dspace.c,v
retrieving revision 1.152
diff -u -a -p -r1.152 dbpf-dspace.c
--- src/io/trove/trove-dbpf/dbpf-dspace.c	8 Nov 2007 21:48:22 -0000	1.152
+++ src/io/trove/trove-dbpf/dbpf-dspace.c	6 Feb 2008 00:38:25 -0000
@@ -31,6 +31,8 @@
 #include "dbpf-attr-cache.h"
 #include "dbpf-open-cache.h"
 
+#define MAX_HANDLE_ALLOC_ATTEMPTS 1024
+
 #define TROVE_DEFAULT_DB_PAGESIZE 512
 
 #ifdef __PVFS2_TROVE_THREADED__
@@ -176,6 +178,7 @@ static int dbpf_dspace_create(TROVE_coll
 static int dbpf_dspace_create_op_svc(struct dbpf_op *op_p)
 {
     int ret = -TROVE_EINVAL;
+    int attempts = 0;
     TROVE_ds_storedattr_s s_attr;
     TROVE_ds_attributes attr;
     TROVE_handle new_handle = TROVE_HANDLE_NULL;
@@ -188,90 +191,94 @@ static int dbpf_dspace_create_op_svc(str
 
     cur_extent = op_p->u.d_create.extent_array.extent_array[0];
 
-    /* check if we got a single specific handle */
-    if ((op_p->u.d_create.extent_array.extent_count == 1) &&
-        (cur_extent.first == cur_extent.last))
+    do
     {
-        /*
-          check if we MUST use the exact handle value specified;
-          if caller requests a specific handle, honor it
-        */
-        if (op_p->flags & TROVE_FORCE_REQUESTED_HANDLE)
+        /* check if we got a single specific handle */
+        if ((op_p->u.d_create.extent_array.extent_count == 1) &&
+            (cur_extent.first == cur_extent.last))
         {
-            /*
-              we should probably handle this error nicely;
-              right now, it will fail later (gracefully) if this
-              fails since the handle will already exist, but
-              since we know it here, handle it here ?
-            */
-            new_handle = cur_extent.first;
-            trove_handle_set_used(op_p->coll_p->coll_id, new_handle);
-            gossip_debug(GOSSIP_TROVE_DEBUG, "new_handle was FORCED "
-                         "to be %llu\n", llu(new_handle));
+            if (cur_extent.first == TROVE_HANDLE_NULL)
+            {
+                /*
+                   if we got TROVE_HANDLE_NULL, the caller doesn't care
+                   where the handle comes from
+                   */
+                new_handle = trove_handle_alloc(op_p->coll_p->coll_id);
+            }
+            else
+            {
+                /*
+                   we should probably handle this error nicely;
+                   right now, it will fail later (gracefully) if this
+                   fails since the handle will already exist, but
+                   since we know it here, handle it here ?
+                   */
+                new_handle = cur_extent.first;
+                gossip_debug(GOSSIP_TROVE_DEBUG, "new_handle was FORCED "
+                             "to be %llu\n", llu(new_handle));
+            }
         }
-        else if (cur_extent.first == TROVE_HANDLE_NULL)
+        else
         {
             /*
-              if we got TROVE_HANDLE_NULL, the caller doesn't care
-              where the handle comes from
-            */
-            new_handle = trove_handle_alloc(op_p->coll_p->coll_id);
+               otherwise, we have to try to allocate a handle from
+               the specified range that we're given
+               */
+            new_handle = trove_handle_alloc_from_range(
+                op_p->coll_p->coll_id, &op_p->u.d_create.extent_array);
         }
-    }
-    else
-    {
+
+        gossip_debug(GOSSIP_TROVE_DEBUG, "[%d extents] -- new_handle is %llu "
+                     "(cur_extent is %llu - %llu)\n",
+                     op_p->u.d_create.extent_array.extent_count,
+                     llu(new_handle), llu(cur_extent.first),
+                     llu(cur_extent.last));
         /*
-          otherwise, we have to try to allocate a handle from
-          the specified range that we're given
-        */
-        new_handle = trove_handle_alloc_from_range(
-            op_p->coll_p->coll_id, &op_p->u.d_create.extent_array);
-    }
-
-    gossip_debug(GOSSIP_TROVE_DEBUG, "[%d extents] -- new_handle is %llu "
-                 "(cur_extent is %llu - %llu)\n",
-                 op_p->u.d_create.extent_array.extent_count,
-                 llu(new_handle), llu(cur_extent.first),
-                 llu(cur_extent.last));
-    /*
-      if we got a zero handle, we're either completely out of handles
-      -- or else something terrible has happened
-    */
-    if (new_handle == TROVE_HANDLE_NULL)
-    {
-        gossip_err("Error: handle allocator returned a zero handle.\n");
-        ret = -TROVE_ENOSPC;
-        goto return_error;
-    }
+           if we got a zero handle, we're either completely out of handles
+           -- or else something terrible has happened
+           */
+        if (new_handle == TROVE_HANDLE_NULL)
+        {
+            gossip_err("Error: handle allocator returned a zero handle.\n");
+            ret = -TROVE_ENOSPC;
+            goto return_error;
+        }
 
-    memset(&s_attr, 0, sizeof(TROVE_ds_storedattr_s));
-    s_attr.type = op_p->u.d_create.type;
+        memset(&s_attr, 0, sizeof(TROVE_ds_storedattr_s));
+        s_attr.type = op_p->u.d_create.type;
 
-    memset(&key, 0, sizeof(key));
-    key.data = &new_handle;
-    key.size = key.ulen = sizeof(new_handle);
+        memset(&key, 0, sizeof(key));
+        key.data = &new_handle;
+        key.size = key.ulen = sizeof(new_handle);
 
-    memset(&data, 0, sizeof(data));
-    data.data = &s_attr;
-    data.size = data.ulen = sizeof(TROVE_ds_storedattr_s);
-    data.flags |= DB_DBT_USERMEM;
+        memset(&data, 0, sizeof(data));
+        data.data = &s_attr;
+        data.size = data.ulen = sizeof(TROVE_ds_storedattr_s);
+        data.flags |= DB_DBT_USERMEM;
 
-    /* check to see if handle is already used */
-    ret = op_p->coll_p->ds_db->get(op_p->coll_p->ds_db, NULL, &key, &data, 0);
-    if (ret == 0)
-    {
-        gossip_debug(GOSSIP_TROVE_DEBUG, "handle (%llu) already exists.\n",
-                     llu(new_handle));
-        ret = -TROVE_EEXIST;
-        goto return_error;
-    }
-    else if ((ret != DB_NOTFOUND) && (ret != DB_KEYEMPTY))
+        /* check to see if handle is already used */
+        ret = op_p->coll_p->ds_db->get(op_p->coll_p->ds_db, NULL, &key, &data, 0);
+        if (ret == 0 && op_p->flags & TROVE_FORCE_REQUESTED_HANDLE)
+        {
+            gossip_debug(GOSSIP_TROVE_DEBUG, "handle (%llu) already exists.\n",
+                         llu(new_handle));
+            ret = -TROVE_EEXIST;
+            goto return_error;
+        }
+        else if ((ret != DB_NOTFOUND) && (ret != DB_KEYEMPTY))
+        {
+            gossip_err("error in dspace create (db_p->get failed).\n");
+            ret = -dbpf_db_error_to_trove_error(ret);
+            goto return_error;
+        }
+    } while(ret != DB_NOTFOUND && ++attempts < MAX_HANDLE_ALLOC_ATTEMPTS);
+
+    if(ret != DB_NOTFOUND)
     {
-        gossip_err("error in dspace create (db_p->get failed).\n");
-        ret = -dbpf_db_error_to_trove_error(ret);
+        ret = -TROVE_ENOSPC;
         goto return_error;
     }
-    
+
     /* check for old bstream files (these should not exist, but it is
      * possible if the db gets out of sync with the rest of the collection
      * somehow
@@ -621,6 +628,25 @@ static int dbpf_dspace_iterate_handles_o
     op_p->u.d_iterate_handles.handle_array[i] = dummy_handle;
     ++i;
 
+    if(i == *op_p->u.d_iterate_handles.count_p)
+    {
+        /* handle single count case */
+        ret = dbc_p->c_get(dbc_p, &key, &data, DB_NEXT);
+        if (ret == DB_NOTFOUND)
+        {
+            goto return_ok;
+        }
+        else if (ret != 0)
+        {
+            ret = -dbpf_db_error_to_trove_error(ret);
+            gossip_err("failed to set cursor position at handle: %llu\n",
+                       llu(*(TROVE_handle *)op_p->u.d_iterate_handles.position_p));
+            goto return_error;
+        }
+        *op_p->u.d_iterate_handles.position_p = dummy_handle;
+        goto return_ok;
+    }
+
     start_size = ((sizeof(TROVE_handle) + sizeof(s_attr)) *
                   (*op_p->u.d_iterate_handles.count_p - 1));
     /* round up to the nearest 1024 */
@@ -915,21 +941,6 @@ static int dbpf_dspace_getattr(TROVE_col
     gen_mutex_lock(&dbpf_attr_cache_mutex);
     if (dbpf_attr_cache_ds_attr_fetch_cached_data(ref, ds_attr_p) == 0)
     {
-#if 0
-        gossip_debug(
-            GOSSIP_TROVE_DEBUG, "ATTRIB: retrieved "
-            "attributes from CACHE for key %llu\n  uid = %d, mode = %d, "
-            "type = %d, dfile_count = %d, dist_size = %d\n",
-            llu(handle), (int)ds_attr_p->uid, (int)ds_attr_p->mode,
-            (int)ds_attr_p->type, (int)ds_attr_p->dfile_count,
-            (int)ds_attr_p->dist_size);
-#endif
-        gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "dspace_getattr fast "
-                     "path attr cache hit on %llu\n (dfile_count=%d | "
-                     "dist_size=%d | data_size=%lld)\n", llu(handle),
-                     ds_attr_p->dfile_count, ds_attr_p->dist_size,
-                     lld(ds_attr_p->b_size));
-
         UPDATE_PERF_METADATA_READ();
         gen_mutex_unlock(&dbpf_attr_cache_mutex);
         return 1;
@@ -988,21 +999,6 @@ static int dbpf_dspace_getattr_list(TROV
 
         if (dbpf_attr_cache_ds_attr_fetch_cached_data(ref, &ds_attr_p[i]) == 0)
         {
-#if 0
-            gossip_debug(
-                GOSSIP_TROVE_DEBUG, "ATTRIB: retrieved "
-                "attributes from CACHE for key %llu\n  uid = %d, mode = %d, "
-                "type = %d, dfile_count = %d, dist_size = %d\n",
-                llu(handle), (int)ds_attr_p->uid, (int)ds_attr_p->mode,
-                (int)ds_attr_p->type, (int)ds_attr_p->dfile_count,
-                (int)ds_attr_p->dist_size);
-#endif
-            gossip_debug(GOSSIP_DBPF_ATTRCACHE_DEBUG, "dspace_getattr fast "
-                         "path attr cache hit on %llu\n (dfile_count=%d | "
-                         "dist_size=%d | data_size=%lld)\n", llu(handle_array[i]),
-                         ds_attr_p->dfile_count, ds_attr_p->dist_size,
-                         lld(ds_attr_p->b_size));
-
             UPDATE_PERF_METADATA_READ();
             error_array[i] = 0;
             continue;
@@ -1110,15 +1106,6 @@ static int dbpf_dspace_setattr_op_svc(st
 
     trove_ds_attr_to_stored((*op_p->u.d_setattr.attr_p), s_attr);
 
-#if 0
-    gossip_debug(GOSSIP_TROVE_DEBUG, "ATTRIB: dspace_setattr storing "
-                 "attributes (2) on key %llu\n uid = %d, mode = %d, "
-                 "type = %d, dfile_count = %d, dist_size = %d\n",
-                 llu(op_p->handle), (int) s_attr.uid, (int) s_attr.mode,
-                 (int) s_attr.type, (int) s_attr.dfile_count,
-                 (int) s_attr.dist_size);
-#endif
-
     ret = op_p->coll_p->ds_db->put(
         op_p->coll_p->ds_db, NULL, &key, &data, 0);
     if (ret != 0)
@@ -1196,14 +1183,6 @@ static int dbpf_dspace_getattr_op_svc(st
         goto return_error;
     }
 
-    gossip_debug(
-        GOSSIP_TROVE_DEBUG, "ATTRIB: retrieved attributes "
-        "from DISK for key %llu\n\tuid = %d, mode = %d, type = %d, "
-        "dfile_count = %d, dist_size = %d\n\tb_size = %lld\n",
-        llu(op_p->handle), (int)s_attr.uid, (int)s_attr.mode,
-        (int)s_attr.type, (int)s_attr.dfile_count, (int)s_attr.dist_size,
-        llu(b_size));
-
     attr = op_p->u.d_getattr.attr_p;
     trove_ds_stored_to_attr(s_attr, *attr, b_size);
 
@@ -1304,14 +1283,6 @@ static int dbpf_dspace_getattr_list_op_s
             op_p->u.d_getattr_list.error_p[i] = -TROVE_EIO;
             continue;
         }
-
-        gossip_debug(
-            GOSSIP_TROVE_DEBUG, "ATTRIB: retrieved attributes "
-            "from DISK for key %llu\n\tuid = %d, mode = %d, type = %d, "
-            "dfile_count = %d, dist_size = %d\n\tb_size = %lld, k_size = %lld\n",
-            llu(op_p->u.d_getattr_list.handle_array[i]), (int)s_attr.uid, (int)s_attr.mode,
-            (int)s_attr.type, (int)s_attr.dfile_count, (int)s_attr.dist_size,
-            llu(b_size), llu(k_size));
 
         attr = &op_p->u.d_getattr_list.attr_p[i];
         trove_ds_stored_to_attr(s_attr, *attr, b_size);
Index: src/io/trove/trove-dbpf/dbpf-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf-mgmt.c,v
retrieving revision 1.100
diff -u -a -p -r1.100 dbpf-mgmt.c
--- src/io/trove/trove-dbpf/dbpf-mgmt.c	28 Jan 2008 19:43:31 -0000	1.100
+++ src/io/trove/trove-dbpf/dbpf-mgmt.c	6 Feb 2008 00:38:25 -0000
@@ -345,7 +345,7 @@ int dbpf_collection_setinfo(TROVE_method
                          "ranges to %s\n", 
                          (int) coll_id, (char *)parameter);
             ret = trove_set_handle_ranges(
-                coll_id, context_id, (char *)parameter);
+                coll_id, (char *)parameter);
             break;
         case TROVE_COLLECTION_HANDLE_TIMEOUT:
             gossip_debug(GOSSIP_TROVE_DEBUG, 
@@ -355,7 +355,7 @@ int dbpf_collection_setinfo(TROVE_method
                          (long)((((struct timeval *)parameter)->tv_sec * 1e6) +
                                 ((struct timeval *)parameter)->tv_usec));
             ret = trove_set_handle_timeout(
-                coll_id, context_id, (struct timeval *)parameter);
+                coll_id, (struct timeval *)parameter);
             break;
         case TROVE_COLLECTION_ATTR_CACHE_KEYWORDS:
             gossip_debug(GOSSIP_TROVE_DEBUG, 
Index: src/io/trove/trove-handle-mgmt/trove-handle-mgmt.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-handle-mgmt/trove-handle-mgmt.c,v
retrieving revision 1.46
diff -u -a -p -r1.46 trove-handle-mgmt.c
--- src/io/trove/trove-handle-mgmt/trove-handle-mgmt.c	15 Aug 2007 18:43:09 -0000	1.46
+++ src/io/trove/trove-handle-mgmt/trove-handle-mgmt.c	6 Feb 2008 00:38:25 -0000
@@ -9,6 +9,9 @@
 #include <string.h>
 #include <assert.h>
 #include <sys/time.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <unistd.h>
 
 #include "trove.h"
 #include "quickhash.h"
@@ -19,239 +22,124 @@
 #include "gen-locks.h"
 #include "pvfs2-internal.h"
 
-/*
-  this is an internal structure and shouldn't be used
-  by anyone except this module
-*/
+#define TROVE_HANDLE_MGMT_MAX_RECENTLY_FREED 262144
+
 typedef struct
 {
-    struct qlist_head hash_link;
+    TROVE_handle handle;
+    struct timeval freed_time;
+    struct qhash_head handle_hash_link;
+    struct qlist_head handle_list_link;
+} handle_entry_t;
 
+typedef struct
+{
     TROVE_coll_id coll_id;
-    int have_valid_ranges;
-
-    struct handle_ledger *ledger;
-} handle_ledger_t;
+    struct timeval timeout;
+    struct qhash_table *recently_freed_table;
+    struct qlist_head expiry_order_list;
+    TROVE_handle_extent_array default_ranges;
+    int count;
 
-static struct qhash_table *s_fsid_to_ledger_table = NULL;
+    struct qlist_head collection_link;
+} collection_entry_t;
 
-/* these are based on code from src/server/request-scheduler.c */
-static int hash_fsid(void *fsid, int table_size);
-static int hash_fsid_compare(void *key, struct qlist_head *link);
+static QLIST_HEAD(freed_list);
 
 static gen_mutex_t trove_handle_mutex = GEN_MUTEX_INITIALIZER;
 
-/* trove_check_handle_ranges:
- *  internal function to verify that handles
- *  on disk match our assigned handles.
- *  this function is *very* expensive.
- *
- * coll_id: id of collection which we will verify
- * extent_list: llist of legal handle ranges/extents
- * ledger: a book-keeping ledger object
- *
- * returns 0 on success; -1 otherwise
- */
-static int trove_check_handle_ranges(TROVE_coll_id coll_id,
-                                     TROVE_context_id context_id,
-                                     PINT_llist *extent_list,
-                                     struct handle_ledger *ledger)
-{
-    int ret = -1, i = 0, count = 0, op_count = 0;
-    TROVE_op_id op_id = 0;
-    TROVE_ds_state state = 0;
-    TROVE_ds_position pos = TROVE_ITERATE_START;
-    static TROVE_handle handles[MAX_NUM_VERIFY_HANDLE_COUNT] =
-        {TROVE_HANDLE_NULL};
-
-    if (extent_list && ledger)
-    {
-        count = MAX_NUM_VERIFY_HANDLE_COUNT;
-
-        while(count > 0)
-        {
-            ret = trove_dspace_iterate_handles(coll_id,&pos,handles,
-                                               &count,0,NULL,NULL,
-                                               context_id,&op_id);
-            while(ret == 0)
-            {
-                ret = trove_dspace_test(coll_id,op_id,context_id,
-                                        &op_count,NULL,NULL,&state,
-                                        TROVE_DEFAULT_TEST_TIMEOUT);
-            }
-
-            /* check result of testing */
-            if (ret < 0)
-            {
-                gossip_debug(GOSSIP_TROVE_DEBUG,
-                             "dspace test of iterate_handles failed\n");
-                return ret;
-            }
-
-            ret = 0;
-
-            /* also check result of actual operation, in this case,
-             * trove_dspace_iterate_handles
-             */
-            if(state < 0)
-            {
-                gossip_debug(GOSSIP_TROVE_DEBUG,
-                             "trove_dspace_iterate_handles failed\n");
-                return state;
-            }
-
-            /* look for special case of a blank fs */
-            if ((count == 1) && (handles[0] == 0))
-            {
-                gossip_debug(GOSSIP_TROVE_DEBUG,
-                             "* Trove: Assuming a blank filesystem\n");
-                return ret;
-            }
+static struct random_data trove_handle_random_data;
+static char randstate[256];
+static int use_rand = 0;
 
-            if (count > 0)
-            {
-                for(i = 0; i != count; i++)
-                {
-                    /* check every item in our range list */
-                    if (!PINT_handle_in_extent_list(extent_list,
-                                                    handles[i]))
-                    {
-                        gossip_err(
-                            "Error: handle %llu is invalid "
-                            "(out of bounds)\n", llu(handles[i]));
-                        return -1;
-                    }
-
-		    /* remove handle from trove-handle-mgmt */
-		    ret = trove_handle_remove(ledger, handles[i]);
-		    if (ret != 0)
-                    {
-			gossip_debug(
-                            GOSSIP_TROVE_DEBUG, "could not remove "
-                            "handle %llu\n", llu(handles[i]));
-			break;
-		    }
-                }
-                ret = ((i == count) ? 0 : -1);
-            }
-        }
-    }
-    return ret;
+static int handle_entry_compare(void *key, struct qhash_head *link)
+{
+    handle_entry_t *hent = qhash_entry(link, handle_entry_t, handle_hash_link);
+    return (*(uint64_t *)key) == hent->handle;
 }
 
-static int trove_map_handle_ranges( PINT_llist *extent_list,
-                                   struct handle_ledger *ledger)
+static collection_entry_t *get_collection(TROVE_coll_id coll_id)
 {
-    int ret = -1;
-    PINT_llist *cur = NULL;
-    PVFS_handle_extent *cur_extent = NULL;
-    int64_t total_handles=0;
-
-    if (extent_list && ledger)
+    collection_entry_t *entry;
+    qlist_for_each_entry(entry, &freed_list, collection_link)
     {
-        cur = extent_list;
-        while(cur)
+        if(entry->coll_id == coll_id)
         {
-            cur_extent = PINT_llist_head(cur);
-            if (!cur_extent)
-            {
-                break;
-            }
-
-	    ret = trove_handle_ledger_addextent(ledger, cur_extent);
-	    if (ret != 0)
-            {
-		break;
-            }
-
-	    /* if, for example, you had a 'first' of 5 and a 'last' of
-	     * 5, the difference is 0, but there is one handle */
-	    total_handles += (cur_extent->last - cur_extent->first + 1);
-            cur = PINT_llist_next(cur);
+            return entry;
         }
-	trove_handle_ledger_set_threshold(ledger, total_handles);
     }
-    return ret;
-}
-
-
-static handle_ledger_t *get_or_add_handle_ledger(TROVE_coll_id coll_id)
-{
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
 
-    /* search for a matching entry */
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
+    /* no entry with coll_id, create one */
+    entry = malloc(sizeof(*entry));
+    if(!entry)
     {
-        /* return it if it exists */
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
+        return NULL;
     }
-    else
+    entry->coll_id = coll_id;
+    entry->timeout.tv_sec = TROVE_DEFAULT_HANDLE_PURGATORY_SEC;
+    entry->timeout.tv_usec = 0;
+    entry->recently_freed_table = qhash_init(
+        handle_entry_compare,
+        quickhash_64bit_hash,
+        32768);
+    if(!entry->recently_freed_table)
     {
-        /* alloc, initialize, then return otherwise */
-        ledger = (handle_ledger_t *)malloc(sizeof(handle_ledger_t));
-        if (ledger)
-        {
-            ledger->coll_id = coll_id;
-            ledger->have_valid_ranges = 0;
-            ledger->ledger = trove_handle_ledger_init(coll_id,NULL);
-            if (ledger->ledger)
-            {
-                qhash_add(s_fsid_to_ledger_table,
-                          &(coll_id),&(ledger->hash_link));
-            }
-            else
-            {
-                free(ledger);
-                ledger = NULL;
-            }
-        }
+        free(entry);
+        return NULL;
     }
-    return ledger;
-}
-
-/* hash_fsid()
- *
- * hash function for fsids added to table
- *
- * returns integer offset into table
- */
-static int hash_fsid(void *fsid, int table_size)
-{
-    /* TODO: update this later with a better hash function,
-     * depending on what fsids look like, for now just modding
-     *
-     */
-    unsigned long tmp = 0;
-    TROVE_coll_id *real_fsid = (TROVE_coll_id *)fsid;
+    INIT_QLIST_HEAD(&entry->expiry_order_list);
+    entry->default_ranges.extent_count = 1;
+    entry->default_ranges.extent_array = malloc(sizeof(PVFS_handle_extent));
+    entry->default_ranges.extent_array[0].first = 0;
+    entry->default_ranges.extent_array[0].first = UINT64_MAX;
+    entry->count = 0;
 
-    tmp += (*(real_fsid));
-    tmp = tmp%table_size;
-
-    return ((int)tmp);
+    gen_mutex_lock(&trove_handle_mutex);
+    qlist_add(&entry->collection_link, &freed_list);
+    gen_mutex_unlock(&trove_handle_mutex);
+    return entry;
 }
 
-/* hash_fsid_compare()
- *
- * performs a comparison of a hash table entry to a given key
- * (used for searching)
- *
- * returns 1 if match found, 0 otherwise
- */
-static int hash_fsid_compare(void *key, struct qlist_head *link)
+static int handle_is_recently_freed(TROVE_coll_id coll_id, TROVE_handle handle)
 {
-    handle_ledger_t *ledger = NULL;
-    TROVE_coll_id *real_fsid = (TROVE_coll_id *)key;
+    struct qhash_head *link;
+    handle_entry_t *hent;
+    handle_entry_t *iter, *iter2;
+    collection_entry_t *entry;
+    struct timeval current;
 
-    ledger = qlist_entry(link, handle_ledger_t, hash_link);
-    assert(ledger);
+    entry = get_collection(coll_id);
+    if(!entry)
+    {
+        return 0;
+    }
 
-    if (ledger->coll_id == *real_fsid)
+    gen_mutex_lock(&trove_handle_mutex);
+    if((link = qhash_search_and_remove(
+                entry->recently_freed_table, (void *)&handle)))
     {
-        return(1);
+        hent = qhash_entry(link, handle_entry_t, handle_hash_link);
+        gettimeofday(&current, NULL);
+        if(((current.tv_sec + current.tv_usec*1e-6) -
+            (hent->freed_time.tv_sec + hent->freed_time.tv_usec*1e-6)) <
+           (entry->timeout.tv_sec + entry->timeout.tv_usec*1e-6))
+        {
+            /* free up expired entries */
+            qlist_for_each_entry_safe(
+                iter, iter2, &hent->handle_list_link, handle_list_link)
+            {
+                qlist_del(&iter->handle_list_link);
+                qhash_search_and_remove(
+                    entry->recently_freed_table, (void *)&iter->handle);
+                free(iter);
+            }
+
+            gen_mutex_unlock(&trove_handle_mutex);
+            return 1;
+        }
     }
-    return(0);
+    gen_mutex_unlock(&trove_handle_mutex);
+    return 0;
 }
 
 int trove_handle_mgmt_initialize()
@@ -260,69 +148,77 @@ int trove_handle_mgmt_initialize()
       due to weird trove_initialize usages; this will always succeed
       unless the hash table initialization really fails.
     */
-    int ret = 0;
-    gen_mutex_lock(&trove_handle_mutex);
+    int rfd;
+    int ret;
 
-    if (s_fsid_to_ledger_table == NULL)
+    rfd = open("/dev/urandom", O_RDONLY, 0);
+    if(rfd < 0)
     {
-        s_fsid_to_ledger_table = qhash_init(hash_fsid_compare,
-                                            hash_fsid,67);
-        ret = (s_fsid_to_ledger_table ? 0 : -1);
+        srand(time(NULL));
+        use_rand = 1;
+        return 0;
     }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
+    ret = read(rfd, randstate, 256);
+    if(ret < 256)
+    {
+        close(rfd);
+        srand(time(NULL));
+        use_rand = 0;
+        return 0;
+    }
+    close(rfd);
+    initstate_r((long)time(NULL), randstate, 256, &trove_handle_random_data);
+
+    return 0;
 }
 
 int trove_set_handle_ranges(TROVE_coll_id coll_id,
-                            TROVE_context_id context_id,
-                            char *handle_range_str)
+                            char *ranges)
 {
-    int ret = -TROVE_EINVAL;
-    PINT_llist *extent_list = NULL;
-    handle_ledger_t *ledger = NULL;
+    int i;
+    PINT_llist *elist, *tmp;
+    collection_entry_t *coll_entry;
+    TROVE_handle_extent *e;
+
+    elist = PINT_create_extent_list(ranges);
+    if(!elist)
+    {
+        return -TROVE_EINVAL;
+    }
 
     gen_mutex_lock(&trove_handle_mutex);
-    if (handle_range_str)
+    coll_entry = get_collection(coll_id);
+
+    if(coll_entry->default_ranges.extent_array)
     {
-        extent_list = PINT_create_extent_list(handle_range_str);
-        if (extent_list)
-        {
-            /*
-              get existing ledger management struct if any;
-              create otherwise
-            */
-            ledger = get_or_add_handle_ledger(coll_id);
-            if (ledger)
-            {
-                /* assert the internal ledger struct is valid */
-                assert(ledger->ledger);
-		
-		/* tell trove what are our valid ranges are */
-		ret = trove_map_handle_ranges(
-                    extent_list, ledger->ledger);
-		if (ret != 0)
-                {
-                    gen_mutex_unlock(&trove_handle_mutex);
-                    return ret;
-                }
-
-                ret = trove_check_handle_ranges(
-                    coll_id,context_id,extent_list,ledger->ledger);
-		if (ret != 0)
-                {
-                    gen_mutex_unlock(&trove_handle_mutex);
-                    return ret;
-                }
-                else
-                {
-                    ledger->have_valid_ranges = 1;
-                }
-            }
-            PINT_release_extent_list(extent_list);
-        }
+        free(coll_entry->default_ranges.extent_array);
     }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
+    coll_entry->default_ranges.extent_count = PINT_llist_count(elist);
+
+    coll_entry->default_ranges.extent_array =
+        malloc(sizeof(TROVE_handle_extent) * 
+               coll_entry->default_ranges.extent_count);
+    if(!coll_entry->default_ranges.extent_array)
+    {
+        return -TROVE_ENOMEM;
+    }
+
+    tmp = elist;
+    i = 0;
+    do
+    {
+        e = PINT_llist_head(tmp);
+
+        coll_entry->default_ranges.extent_array[i].first = e->first;
+        coll_entry->default_ranges.extent_array[i].last = e->last;
+        ++i;
+
+    } while((tmp = PINT_llist_next(tmp)));
+    gen_mutex_lock(&trove_handle_mutex);
+
+    PINT_llist_free(elist, free);
+
+    return 0;
 }
 
 /*
@@ -331,272 +227,179 @@ int trove_set_handle_ranges(TROVE_coll_i
  * avaliable handles 
  */
 int trove_set_handle_timeout(TROVE_coll_id coll_id,
-                             TROVE_context_id context_id,
                              struct timeval *timeout)
 {
-    int ret = -1;
-    handle_ledger_t *ledger = NULL;
+    collection_entry_t *entry;
 
     gen_mutex_lock(&trove_handle_mutex);
-    ledger = get_or_add_handle_ledger(coll_id);
-    if (ledger)
+    entry = get_collection(coll_id);
+    if (!entry)
     {
-	/* assert the internal ledger struct is valid */
-	assert(ledger->ledger);
-
-	/*
-          tell trove how long the timeout should be.
-          if 0 is specified, use a reasonable default value
-        */
-        timeout->tv_sec = ((timeout->tv_sec == 0) ?
-                           TROVE_DEFAULT_HANDLE_PURGATORY_SEC :
-                           timeout->tv_sec);
-	ret = trove_ledger_set_timeout(ledger->ledger, timeout);
-
-        gossip_debug(GOSSIP_TROVE_DEBUG, "- set handle re-use "
-                     "timeout to %d seconds (ret=%d)\n",
-                     (int)timeout->tv_sec, ret);
+        gen_mutex_unlock(&trove_handle_mutex);
+        return -TROVE_EINVAL;
     }
+
+    entry->timeout.tv_usec = timeout->tv_usec;
+    entry->timeout.tv_sec = timeout->tv_sec;
+
     gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
+    return 0;
 }
 
-TROVE_handle trove_handle_alloc(TROVE_coll_id coll_id)
+TROVE_handle trove_handle_alloc(
+    TROVE_coll_id coll_id)
 {
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
-    TROVE_handle handle = TROVE_HANDLE_NULL;
+    collection_entry_t *colle;
 
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
-    {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger && (ledger->have_valid_ranges == 1))
-        {
-            handle = trove_ledger_handle_alloc(ledger->ledger);
-        }
-    }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return handle;
+    colle = get_collection(coll_id);
+    return trove_handle_alloc_from_range(coll_id, &colle->default_ranges);
 }
 
+#define GEN_RAND32(randvar)                               \
+do {                                                      \
+    if(use_rand)                                          \
+    {                                                     \
+        randvar = rand();                                 \
+    }                                                     \
+    else                                                  \
+    {                                                     \
+        random_r(&trove_handle_random_data, &randvar);    \
+    }                                                     \
+} while(0)
+
 TROVE_handle trove_handle_alloc_from_range(
     TROVE_coll_id coll_id,
     TROVE_handle_extent_array *extent_array)
 {
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
     TROVE_handle handle = TROVE_HANDLE_NULL;
+    TROVE_handle range;
     int i = 0;
+    int32_t r1, r2;
 
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table, &(coll_id));
-    if (hash_link)
-    {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger && (ledger->have_valid_ranges == 1))
-        {
-            for(i = 0; i < extent_array->extent_count; i++)
-            {
-                handle = trove_ledger_handle_alloc_from_range(
-                    ledger->ledger, &(extent_array->extent_array[i]));
-                if (handle != TROVE_HANDLE_NULL)
-                {
-                    break;
-                }
-            }
-        }
-    }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return handle;
-}
+retry_handle_alloc:
 
-int trove_handle_peek(
-    TROVE_coll_id coll_id,
-    TROVE_handle *out_handle_array,
-    int max_num_handles,
-    int *returned_handle_count)
-{
-    int ret = -TROVE_EINVAL;
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
+    /* pick an extent randomly from the array */
+    GEN_RAND32(r1);
+    i = r1 % extent_array->extent_count;
 
-    if (!out_handle_array || !returned_handle_count)
-    {
-        return ret;
-    }
+    handle = r1;
 
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
-    {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger && (ledger->have_valid_ranges == 1))
-        {
-            ret = trove_ledger_peek_handles(
-                ledger->ledger, out_handle_array,
-                max_num_handles, returned_handle_count);
-        }
-    }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
-}
+    GEN_RAND32(r2);
+    handle |= ((PVFS_handle)r2) << 32;
 
-int trove_handle_peek_from_range(
-    TROVE_coll_id coll_id,
-    TROVE_handle_extent_array *extent_array,
-    TROVE_handle *out_handle_array,
-    int max_num_handles,
-    int *returned_handle_count)
-{
-    int ret = -TROVE_EINVAL, i = 0;
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
+    range = extent_array->extent_array[i].last -
+        extent_array->extent_array[i].first;
 
-    if (!extent_array || !out_handle_array || !returned_handle_count)
-    {
-        return ret;
-    }
+    handle = handle % range;
+    handle += extent_array->extent_array[i].first;
 
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
+    /* verify that its not in the freed list */
+    if(handle_is_recently_freed(coll_id, handle))
     {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger && (ledger->have_valid_ranges == 1))
-        {
-            for(i = 0; i < extent_array->extent_count; i++)
-            {
-                ret = trove_ledger_peek_handles_from_extent(
-                    ledger->ledger, &(extent_array->extent_array[i]),
-                    out_handle_array, max_num_handles,
-                    returned_handle_count);
-                /*
-                  if we get any handles back, just return, even if
-                  it's not the full amount requested
-                */
-                if (ret == 0)
-                {
-                    assert(*returned_handle_count > 0);
-                    break;
-                }
-            }
-        }
+        goto retry_handle_alloc;
     }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
+
+    return handle;
 }
 
-int trove_handle_set_used(TROVE_coll_id coll_id, TROVE_handle handle)
+int trove_handle_free(TROVE_coll_id coll_id, TROVE_handle handle)
 {
-    int ret = -1;
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
+    struct qlist_head *iter, *iter2;
+    collection_entry_t *cent;
+    handle_entry_t *hent, *newent;
+    struct timeval current;
 
     gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
+    cent = get_collection(coll_id);
+    if(!cent)
     {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger)
-        {
-            ret = trove_handle_remove(ledger->ledger,handle);
-        }
+        return -TROVE_EINVAL;
     }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
-}
 
-int trove_handle_free(TROVE_coll_id coll_id, TROVE_handle handle)
-{
-    int ret = -1;
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
-
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
+    if(qhash_search(cent->recently_freed_table, &handle))
     {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger)
-        {
-            ret = trove_ledger_handle_free(ledger->ledger, handle);
-        }
+        /* found a handle already in the freed table */
+        return -TROVE_EINVAL;
     }
-    gen_mutex_unlock(&trove_handle_mutex);
-    return ret;
-}
 
-/* trove_handle_get_statistics()
- *
- * retrieves handle usage statistics from given collection; right now
- * this simply means returning the count of free handles
- *
- * returns 0 on success, -1 on error
- */
-int trove_handle_get_statistics(TROVE_coll_id coll_id, uint64_t* free_count)
-{
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
+    gettimeofday(&current, NULL);
 
-    gen_mutex_lock(&trove_handle_mutex);
-    hash_link = qhash_search(s_fsid_to_ledger_table,&(coll_id));
-    if (hash_link)
+    /* free up expired entries first */
+    qlist_for_each_reverse_safe(iter, iter2, &cent->expiry_order_list)
     {
-        ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-        if (ledger)
+        hent = qlist_entry(iter, handle_entry_t, handle_list_link);
+        if(((current.tv_sec + current.tv_usec*1e6) -
+            (hent->freed_time.tv_sec + hent->freed_time.tv_usec*1e6)) >
+           (cent->timeout.tv_sec + cent->timeout.tv_usec*1e-6))
         {
-	    trove_handle_ledger_get_statistics(ledger->ledger, 
-		free_count);
-            gen_mutex_unlock(&trove_handle_mutex);
-	    return(0);
+            qlist_del(iter);
+            qhash_search_and_remove(cent->recently_freed_table,
+                                    &hent->handle);
+            free(hent);
+            cent->count--;
         }
-	else
-	{
-            gen_mutex_unlock(&trove_handle_mutex);
-	    return(-PVFS_ENOENT);
-	}
     }
-    else
+
+    if(cent->count > TROVE_HANDLE_MGMT_MAX_RECENTLY_FREED)
+    {
+        /* delete the last even though it hasn't expired
+         * so that we can insert one more
+         */
+        hent = qlist_entry(
+            cent->expiry_order_list.prev,
+            handle_entry_t, handle_list_link);
+        qlist_del(&hent->handle_list_link);
+        qhash_search_and_remove(
+            cent->recently_freed_table, &hent->handle);
+        free(hent);
+        cent->count--;
+    }
+
+    /* now create and add the freed entry */
+    newent = malloc(sizeof(*hent));
+    if(!newent)
     {
         gen_mutex_unlock(&trove_handle_mutex);
-	return(-PVFS_ENOENT);
+        return -TROVE_ENOMEM;
     }
+
+    newent->handle = handle;
+    newent->freed_time = current;
+    qhash_add(cent->recently_freed_table,
+              &hent->handle, &hent->handle_hash_link);
+    qlist_add(&hent->handle_list_link, &cent->expiry_order_list);
+    cent->count++;
+    gen_mutex_unlock(&trove_handle_mutex);
+
+    return 0;
 }
 
-int trove_handle_mgmt_finalize()
+int trove_handle_mgmt_finalize(void)
 {
-    int i;
-    handle_ledger_t *ledger = NULL;
-    struct qlist_head *hash_link = NULL;
+    struct qlist_head *iter1, *iter2;
+    struct qlist_head *hiter1, *hiter2;
+    collection_entry_t *colle;
+    handle_entry_t *hentry;
 
     gen_mutex_lock(&trove_handle_mutex);
-    /*
-      this is an exhaustive and slow iterate.  speed this up
-      if 'finalize' is something that will be done frequently.
-    */
-    for (i = 0; i < s_fsid_to_ledger_table->table_size; i++)
+    qlist_for_each_safe(iter1, iter2, &freed_list)
     {
-        do
+        colle = qlist_entry(iter1, collection_entry_t, collection_link);
+        qlist_for_each_safe(hiter1, hiter2, &colle->expiry_order_list)
         {
-            hash_link =
-                qhash_search_and_remove_at_index(s_fsid_to_ledger_table, i);
-            if (hash_link)
-            {
-                ledger = qlist_entry(hash_link, handle_ledger_t, hash_link);
-                assert(ledger);
-                assert(ledger->ledger);
-
-                trove_handle_ledger_free(ledger->ledger);
-                free(ledger);
-            }
-        } while(hash_link);
+            hentry = qlist_entry(hiter1, handle_entry_t, handle_list_link);
+            qhash_search_and_remove(
+                colle->recently_freed_table, &hentry->handle);
+            qlist_del(&hentry->handle_list_link);
+            free(hentry);
+        }
+        qhash_finalize(colle->recently_freed_table);
+        free(colle->default_ranges.extent_array);
+        free(colle);
+        qlist_del(&colle->collection_link);
     }
-    qhash_finalize(s_fsid_to_ledger_table);
-    s_fsid_to_ledger_table = NULL;
-
     gen_mutex_unlock(&trove_handle_mutex);
+
     return 0;
 }
 
Index: src/io/trove/trove-handle-mgmt/trove-handle-mgmt.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-handle-mgmt/trove-handle-mgmt.h,v
retrieving revision 1.14
diff -u -a -p -r1.14 trove-handle-mgmt.h
--- src/io/trove/trove-handle-mgmt/trove-handle-mgmt.h	15 Mar 2007 21:45:25 -0000	1.14
+++ src/io/trove/trove-handle-mgmt/trove-handle-mgmt.h	6 Feb 2008 00:38:25 -0000
@@ -17,59 +17,20 @@
 */
 int trove_handle_mgmt_initialize(void);
 
-int trove_set_handle_ranges(
-    TROVE_coll_id coll_id,
-    TROVE_context_id context_id,
-    char *handle_range_str);
-
 int trove_set_handle_timeout(
     TROVE_coll_id coll_id,
-    TROVE_context_id context_id,
     struct timeval * timeout);
 
 /*
-  returns a valid TROVE_handle on success and TROVE_HANDLE_NULL
-  otherwise (e.g.. no more free handles)
-*/
-TROVE_handle trove_handle_alloc(TROVE_coll_id coll_id);
-
-/*
   returns a valid TROVE_handle from the range-set given in the
   PVFS_handle_extent_array; TROVE_HANDLE_NULL otherwise
 */
 TROVE_handle trove_handle_alloc_from_range(
-    TROVE_coll_id, 
+    TROVE_coll_id,
     TROVE_handle_extent_array *extent_array);
 
-/*
-  fills in some number of handles that are not removed from the handle
-  allocator's free list and guarantees that subsequent calls to
-  trove_handle_alloc will return these handles in that order.  NOTE:
-  this list must NOT be stored or relied on because it could change as
-  calls to trove_handle_alloc and trove_handle_alloc_from_range calls
-  are made.  return value is 0 on success, non-zero on failure
-*/
-int trove_handle_peek(
-    TROVE_coll_id coll_id,
-    TROVE_handle *out_handle_array,
-    int max_num_handles,
-    int *returned_handle_count);
-
-/*
-  same as trove_handle_peek above except that the returned handles are
-  limited to the specified ranges.  return value is 0 on success,
-  non-zero on failure
-*/
-int trove_handle_peek_from_range(
-    TROVE_coll_id coll_id,
-    TROVE_handle_extent_array *extent_array,
-    TROVE_handle *out_handle_array,
-    int max_num_handles,
-    int *returned_handle_count);
-
-int trove_handle_set_used(
-    TROVE_coll_id coll_id,
-    TROVE_handle handle);
+TROVE_handle trove_handle_alloc(
+    TROVE_coll_id coll_id);
 
 int trove_handle_free(
     TROVE_coll_id coll_id,
@@ -77,9 +38,9 @@ int trove_handle_free(
 
 int trove_handle_mgmt_finalize(void);
 
-int trove_handle_get_statistics(
+int trove_set_handle_ranges(
     TROVE_coll_id coll_id,
-    uint64_t *free_count);
+    char *ranges);
 
 #endif /* __TROVE_HANDLE_MGMT_H */
 
Index: src/io/trove/trove-handle-mgmt/trove-ledger.c
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-handle-mgmt/trove-ledger.c,v
retrieving revision 1.34
diff -u -a -p -r1.34 trove-ledger.c
--- src/io/trove/trove-handle-mgmt/trove-ledger.c	28 Jul 2004 14:32:52 -0000	1.34
+++ src/io/trove/trove-handle-mgmt/trove-ledger.c	6 Feb 2008 00:38:25 -0000
@@ -118,8 +118,28 @@ void trove_handle_ledger_free(struct han
 
 TROVE_handle trove_ledger_handle_alloc(struct handle_ledger *hl)
 {
+    long r1, r2;
+    int i;
+    TROVE_handle handle, range;
+
+#ifdef TROVE_HANDLE_LEDGER_ENABLED
     return (hl ? extentlist_get_and_dec_extent(
                 &(hl->free_list)) : TROVE_HANDLE_NULL);
+#else
+    r1 = random();
+
+    i = r1 % hl->free_list.num_extents;
+    
+    handle = r1;
+    if(sizeof(r1) == 4)
+    {
+        r2 = random();
+        handle |= ((PVFS_handle)r2) << 32;
+    }
+    range = hl->free_list.extents[i].last - hl->free_list.extents[i].first;
+    handle = handle % range;
+    return handle;
+#endif
 }
 
 /* if possible, allocate a handle within the given starting point and
Index: src/proto/pvfs2-req-proto.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/proto/pvfs2-req-proto.h,v
retrieving revision 1.151
diff -u -a -p -r1.151 pvfs2-req-proto.h
--- src/proto/pvfs2-req-proto.h	17 Aug 2007 04:04:27 -0000	1.151
+++ src/proto/pvfs2-req-proto.h	6 Feb 2008 00:38:25 -0000
@@ -759,7 +759,7 @@ do {                                    
 
 struct PVFS_servresp_statfs
 {
-    PVFS_statfs stat;
+    struct PVFS_statfs stat;
 };
 endecode_fields_1_struct(
     PVFS_servresp_statfs,
Index: src/server/statfs.sm
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/server/statfs.sm,v
retrieving revision 1.16
diff -u -a -p -r1.16 statfs.sm
--- src/server/statfs.sm	30 Aug 2007 00:13:45 -0000	1.16
+++ src/server/statfs.sm	6 Feb 2008 00:38:25 -0000
@@ -62,17 +62,6 @@ static PINT_sm_action statfs_do_statfs(
     TROVE_context_id tmp_context;
     struct server_configuration_s *user_opts = get_server_config_struct();
 
-    /* first try to gather handle statistics */
-    ret = trove_handle_get_statistics(
-        s_op->req->u.statfs.fs_id,
-        &s_op->resp.u.statfs.stat.handles_available_count);
-
-    if (ret < 0)
-    {
-        js_p->error_code = ret;
-        return SM_ACTION_COMPLETE;
-    }
-
     /* find out how many total handles this server controls */
     ret = PINT_cached_config_get_server_handle_count(
         user_opts->host_id, s_op->req->u.statfs.fs_id,
