I've made some inlined comments about the dbpf thread code.

Also, I have a general question about server shutdown. Julian's changes to dbpf have made me think about how server shutdown works, and it looks like on a SIGINT from the user, the server sets a flag, and the server loop that continuously checks for completed jobs checks the flag on each iteration. Once the flag is set, the server calls a shutdown function. The shutdown function just calls finalize for each of the modules (bmi, trove, flow, etc), and then exits.

If I'm reading the code right, this allows for pending requests to be cancelled without completing, and in fact where they get cancelled is undetermined, since a request consists of usually more than one job. This seems undesirable to me...am I'm missing something in the code? Shouldn't we remove all the bmi unexpected jobs queued, wait for all current requests to finish, and then go to shutdown?

-sam

On Jul 20, 2006, at 1:12 PM, CVS commit program wrote:

Update of /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf
In directory parlweb1:/tmp/cvs-serv18266/src/io/trove/trove-dbpf

Modified Files:
      Tag: kunkel-branch
        dbpf-bstream.c dbpf-context.c dbpf-context.h dbpf-dspace.c
        dbpf-keyval.c dbpf-mgmt.c dbpf-op-queue.c dbpf-op-queue.h
        dbpf-op.c dbpf-op.h dbpf-open-cache.c dbpf-sync.c dbpf-sync.h
        dbpf-thread.c dbpf-thread.h dbpf.h
Log Message:
Checkin of previous patches



--snip--

In dbpf-thread.c:dbpf_thread_function, you have this:

+    while(dbpf_threads_running)
     {
-        /* grab next op from queue and mark it as in service */
-        gen_mutex_lock(&dbpf_op_queue_mutex);
-        cur_op = dbpf_op_queue_shownext(&dbpf_op_queue);
-        if (cur_op)
-        {
-            gen_mutex_lock(&cur_op->mutex);
-            if (cur_op->op.state != OP_QUEUED)
-            {
-                gossip_err("INVALID OP STATE FOUND %d (op is %p)\n",
-                           cur_op->op.state, cur_op);
-                assert(cur_op->op.state == OP_QUEUED);
-            }
-
-            dbpf_queued_op_dequeue_nolock(cur_op);
-
-            cur_op->op.state = OP_IN_SERVICE;
-            gen_mutex_unlock(&cur_op->mutex);
-        }
-        gen_mutex_unlock(&dbpf_op_queue_mutex);
+ gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_meta_thread_function \"%s\" ITERATING\n",thread_type);
+        /* check if we any have ops to service in our work queue */
+        gen_mutex_lock(work_queue_mutex);
+        op_queued_empty = dbpf_op_queue_empty(work_queue);

-        /* if there's no work to be done, return immediately */
-        if (cur_op == NULL)
+        if (op_queued_empty)
         {
-            return ret;
+ gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_meta_thread_function \"%s\" QUEUE EMPTY\n",thread_type); + /* it is possible that the timing is so bad at startup that thread hang up...*/
+            ret = pthread_cond_wait(queue_cond,
+                                         work_queue_mutex);
+            if (! dbpf_threads_running){
+                gen_mutex_unlock(work_queue_mutex);
+                continue;
+            }
         }


This looks like a bug. Due to spurious wake ups from cond_wait, you always want to go back and check if the op queue is still empty. I think you can get solve that by removing the if conditional, similar to how use cond_wait in other parts of the code.

There's a separate issue with using dbpf_threads_running as the boolean check for all of the dbpf threads, and not locking the mutexes around the while expression. If the finalize is called, I don't think we want remaining operations in the work queue to be serviced, but its possible that one of the dbpf threads might in fact be servicing or have just done the boolean check before the value was changed.

I'd prefer to see separate boolean integer values for each thread. The values can be changed within the same lock block that cond_signal is called in. Also, you could move the lock to outside of the while loop so you don't 'miss' any changes to the variable, and avoid servicing ops after the finalize was called.

+ gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_meta_thread_function \"%s\" fetching new element \n",thread_type);
+        cur_op = dbpf_op_pop_front_nolock(work_queue);
+        dbpf_sync_coalesce_dequeue(cur_op);
+
+        assert(cur_op);
+        dbpf_op_change_status(cur_op, OP_IN_SERVICE);
+        gen_mutex_unlock(work_queue_mutex);

-        /* otherwise, service the current operation now */
+        /* service the current operation now */
         gossip_debug(GOSSIP_TROVE_OP_DEBUG,"***** STARTING TROVE "
                      "SERVICE ROUTINE (%s) *****\n",
                      dbpf_op_type_to_str(cur_op->op.type));
@@ -181,52 +242,44 @@ int dbpf_do_one_work_cycle(int *out_coun
         gossip_debug(GOSSIP_TROVE_OP_DEBUG,"***** FINISHED TROVE "
                      "SERVICE ROUTINE (%s) *****\n",
                      dbpf_op_type_to_str(cur_op->op.type));
-        if (ret == DBPF_OP_COMPLETE || ret < 0)
-        {
-            /* operation is done and we are telling the caller;
-             * ok to pull off queue now.
-             *
-             * returns error code from operation in queued_op struct
-             */
-            (*out_count)++;
-
-            /* this is a macro defined in dbpf-thread.h */
-            move_op_to_completion_queue(
-                cur_op, ((ret == 1) ? 0 : ret), OP_COMPLETED);
-        }
-        else if(ret == DBPF_OP_NEEDS_SYNC)
-        {
-            ret = dbpf_sync_coalesce(cur_op);
+
+        if ( DBPF_OP_MODIFIYING_META_OP(cur_op->op.type) ){

should be MODIFYING

+            if ( ret == DBPF_OP_COMPLETE )
+            {
+                ret = dbpf_sync_coalesce(cur_op, 1, 0);
+            }
+            else if( ret < 0 )
+            {
+                /*
+                 * We cannot enqueue error responses, instead we
+                 * sync and return !
+                 */
+                ret = dbpf_sync_coalesce(cur_op, 0, ret);
+            }
+            else
+            {
+                assert(0);


I've mentioned this before, and I don't want it to sound like I'm harping, but in general I think that asserts should be at the beginning of functions, and if you're using them to check results of a function, they should be right after the function call, and they should always evaluate an expression, since that expression gets printed to stderr when the assert actually fails. Ok I guess I am harping a little. ;-)

+            }
             if(ret < 0)
             {
- return ret; /* not sure how to recover from failure here */ + gossip_err("Error: dbpf_sync_coalesce ret < 0 ! \n");
+                     /* not sure how to recover from failure here */
             }
         }
-        else
+        else if (ret == DBPF_OP_COMPLETE || ret < 0)
         {
-
-#ifndef __PVFS2_TROVE_AIO_THREADED__
-            /*
-              check if trove is telling us to NOT mark this as
-              completed, and also to NOT re-add it to the service
-              queue.  this can happen if trove is throttling I/O
-              internally and will handle re-starting the operation
-              without our help.
-            */
-            if (cur_op->op.state == OP_INTERNALLY_DELAYED)
-            {
-                continue;
-            }
-#endif
-            assert(cur_op->op.state != OP_COMPLETED);
-            dbpf_queued_op_queue(cur_op);
+            dbpf_move_op_to_completion_queue(
+                cur_op, ((ret == 1) ? 0 : ret), OP_COMPLETED);
+        }else{
+            assert(0);
         }
+    }

-    } while(--max_num_ops_to_service);
-#endif
-
-    return 0;
+ gossip_debug(GOSSIP_TROVE_DEBUG, "dbpf_meta_thread_function \"% s\" ending\n",thread_type);
+    return ptr;
 }
+
+

 /*
  * Local variables:

Index: dbpf-thread.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf- thread.h,v
diff -p -u -r1.9 -r1.9.6.1
--- dbpf-thread.h       5 Jun 2006 19:57:27 -0000       1.9
+++ dbpf-thread.h       20 Jul 2006 18:12:07 -0000      1.9.6.1
@@ -14,35 +14,13 @@ extern "C" {
 #include "trove.h"
 #include "dbpf.h"

-#define DBPF_OPS_PER_WORK_CYCLE 5
-
 int dbpf_thread_initialize(void);

 int dbpf_thread_finalize(void);

 void *dbpf_thread_function(void *ptr);
+void *dbpf_background_file_removal_thread_function(void *ptr);

-int dbpf_do_one_work_cycle(int *out_count);
-
-#define move_op_to_completion_queue(cur_op, ret_state, end_state)  \
-do { TROVE_context_id cid = cur_op->op.context_id;                 \
-cur_op->state = ret_state;                                         \
-context_mutex = dbpf_completion_queue_array_mutex[cid];            \
-assert(context_mutex);                                             \
-/*                                                                 \
-  it's important to atomically place the op in the completion      \
-  queue and change the op state to 'end_state' so that dspace_test \
-  and dspace_testcontext play nicely together                      \
-*/                                                                 \
-gen_mutex_lock(context_mutex);                                     \
-dbpf_op_queue_add(dbpf_completion_queue_array[cid],cur_op);        \
-gen_mutex_lock(&cur_op->mutex);                                    \
-cur_op->op.state = end_state;                                      \
-gen_mutex_unlock(&cur_op->mutex);                                  \
-/* wake up one waiting thread, if any */                           \
-pthread_cond_signal(&dbpf_op_completed_cond);                      \
-gen_mutex_unlock(context_mutex);                                   \
-} while(0)

 #if defined(__cplusplus)
 }

Index: dbpf.h
===================================================================
RCS file: /projects/cvsroot/pvfs2/src/io/trove/trove-dbpf/dbpf.h,v
diff -p -u -r1.74 -r1.74.2.1
--- dbpf.h      18 Jul 2006 16:09:23 -0000      1.74
+++ dbpf.h      20 Jul 2006 18:12:07 -0000      1.74.2.1
@@ -30,16 +30,24 @@ extern "C" {
 #define TROVE_DB_DIRTY_READ             0
 #endif /* HAVE_DB_DIRTY_READ */

-#ifdef __PVFS2_TROVE_THREADED__
-#define TROVE_DB_THREAD DB_THREAD
+
+#ifdef HAVE_TROVE_TRANSACTION_SUPPORT
+#define COLL_ENV_FLAGS (DB_INIT_MPOOL | DB_CREATE | DB_THREAD | DB_INIT_TXN)
 #else
-#define TROVE_DB_THREAD         0
-#endif /* __PVFS2_TROVE_THREADED__ */
+#define COLL_ENV_FLAGS (DB_INIT_MPOOL | DB_CREATE | DB_THREAD)
+#endif

#define TROVE_DB_MODE 0644 #define TROVE_DB_TYPE DB_BTREE -#define TROVE_DB_OPEN_FLAGS (TROVE_DB_DIRTY_READ | TROVE_DB_THREAD) -#define TROVE_DB_CREATE_FLAGS (DB_CREATE | TROVE_DB_OPEN_FLAGS)
+
+#ifdef HAVE_TROVE_TRANSACTION_SUPPORT
+#define TROVE_DB_OPEN_TXN DB_AUTO_COMMIT
+#else
+#define TROVE_DB_OPEN_TXN 0
+#endif
+
+#define TROVE_DB_OPEN_FLAGS (TROVE_DB_DIRTY_READ | DB_THREAD) +#define TROVE_DB_CREATE_FLAGS (DB_CREATE | TROVE_DB_OPEN_FLAGS | TROVE_DB_OPEN_TXN)

 /*
   for more efficient host filesystem accesses, we have a simple
@@ -74,16 +82,8 @@ extern "C" {
 do { snprintf(__buf, __path_max, "/%s", __stoname); } while (0)

 #define STO_ATTRIB_DBNAME "storage_attributes.db"
-#define DBPF_GET_STO_ATTRIB_DBNAME(__buf, __path_max, __stoname) \ -do { \ - snprintf(__buf, __path_max, "/%s/%s", __stoname, STO_ATTRIB_DBNAME); \
-} while (0)

 #define COLLECTIONS_DBNAME "collections.db"
-#define DBPF_GET_COLLECTIONS_DBNAME(__buf, __path_max, __stoname) \ -do { \ - snprintf(__buf, __path_max, "/%s/%s", __stoname, COLLECTIONS_DBNAME); \
-} while (0)

#define DBPF_GET_COLL_DIRNAME(__buf, __path_max, __stoname, __collid) \ do { \
@@ -119,6 +119,27 @@ do {
                  __stoname, __collid, STRANDED_BSTREAM_DIRNAME); \
     } while(0)

+#define SHADOW_REMOVE_BSTREAM_DIRNAME "removable-bstreams"
+#define DBPF_GET_SHADOW_REMOVE_BSTREAM_DIRNAME(                  \
+        __buf, __path_max, __stoname )                           \
+do {                                                             \
+    snprintf(__buf, __path_max,                                  \
+        "/%s/%s",                                                \
+        __stoname,                                               \
+       SHADOW_REMOVE_BSTREAM_DIRNAME);                           \
+} while(0)
+
+#define DBPF_GET_SHADOW_REMOVE_BSTREAM_FILENAME(                 \
+        __buf, __path_max, __stoname, __collid, __handle, __rand)\
+do {                                                             \
+    snprintf(__buf, __path_max,                                  \
+        "/%s/%s/%08x_%.8llu_%08llx.%d",                          \
+        __stoname, SHADOW_REMOVE_BSTREAM_DIRNAME, __collid,      \
+       llu(DBPF_BSTREAM_GET_BUCKET(__handle, __collid)),         \
+       llu(__handle), __rand);            \
+} while(0)
+
+
 /* arguments are: buf, path_max, stoname, collid, handle */
#define DBPF_GET_BSTREAM_FILENAME(__b, __pm, __stoname, __cid, __handle) \ do { \
@@ -170,6 +191,7 @@ struct dbpf_storage
     TROVE_ds_flags flags;
     int refct;
     char *name;
+    DB_ENV * env;
     DB *sto_attr_db;
     DB *coll_db;
 };
@@ -194,7 +216,7 @@ struct dbpf_collection

     int c_low_watermark;
     int c_high_watermark;
-    int meta_sync_enabled;
+    int meta_sync_mode;
     /*
      * If this option is on we don't queue ops or use threads.
      */
@@ -403,7 +425,7 @@ enum dbpf_op_type
#define DBPF_OP_IS_KEYVAL(__type) (__type >= KEYVAL_READ && __type < DSPACE_CREATE)
 #define DBPF_OP_IS_DSPACE(__type) (__type >= DSPACE_CREATE)

-#define DBPF_OP_DOES_SYNC(__op)    \
+#define DBPF_OP_MODIFIYING_META_OP(__op)    \
     (__op == KEYVAL_WRITE       || \
      __op == KEYVAL_REMOVE_KEY  || \
      __op == KEYVAL_WRITE_LIST  || \
@@ -411,6 +433,7 @@ enum dbpf_op_type
      __op == DSPACE_REMOVE      || \
      __op == DSPACE_SETATTR)

+
 /*
   a function useful for debugging that returns a human readable
   op_type name given an op_type; returns NULL if no match is found
@@ -426,24 +449,27 @@ enum dbpf_op_state
     OP_COMPLETED,
     OP_DEQUEUED,
     OP_CANCELED,
-    OP_INTERNALLY_DELAYED,
-    OP_SYNC_QUEUED
+    OP_INTERNALLY_DELAYED
 };

 #define DBPF_OP_CONTINUE 0
 #define DBPF_OP_COMPLETE 1
-#define DBPF_OP_NEEDS_SYNC 2

 /* Used to store parameters for queued operations */
 struct dbpf_op
 {
     enum dbpf_op_type type;
     enum dbpf_op_state state;
+
+    gen_mutex_t state_mutex;
+
     TROVE_handle handle;
     TROVE_op_id id;
+
     struct dbpf_collection *coll_p;
     int (*svc_fn)(struct dbpf_op *op);
     void *user_ptr;
+
     TROVE_ds_flags flags;
     TROVE_context_id context_id;
     union
@@ -488,9 +514,11 @@ PVFS_error dbpf_db_error_to_trove_error(
 #define DBPF_READ   read
 #define DBPF_CLOSE  close
 #define DBPF_UNLINK unlink
-#define DBPF_SYNC   fsync
+#define DBPF_SYNC   fdatasync
 #define DBPF_RESIZE ftruncate
 #define DBPF_FSTAT  fstat
+#define DBPF_RENAME rename
+#define DBPF_MKDIR  mkdir

 #define DBPF_AIO_SYNC_IF_NECESSARY(dbpf_op_ptr, fd, ret)  \
 do {                                                      \
@@ -514,7 +542,8 @@ do {
 #define DBPF_ERROR_SYNC_IF_NECESSARY(dbpf_op_ptr, fd)    \
 do {                                                     \
     int tmp_ret, tmp_errno;                              \
-    if (dbpf_op_ptr->flags & TROVE_SYNC)                 \
+    if ((dbpf_op_ptr->flags & TROVE_SYNC) &&                  \
+      dbpf_op_ptr->coll_p->meta_sync_mode == TROVE_SYNC_MODE )\
     {                                                    \
         if ((tmp_ret = DBPF_SYNC(fd)) != 0)              \
         {                                                \
@@ -535,7 +564,8 @@ do {
 do {                                                          \
     int tmp_ret;                                              \
     ret = 0;                                                  \
-    if (dbpf_op_ptr->flags & TROVE_SYNC)                      \
+    if ((dbpf_op_ptr->flags & TROVE_SYNC) &&                  \
+      dbpf_op_ptr->coll_p->meta_sync_mode == TROVE_SYNC_MODE )\
     {                                                         \
         if ((tmp_ret = db_ptr->sync(db_ptr, 0)) != 0)         \
         {                                                     \

_______________________________________________
Pvfs2-cvs mailing list
[EMAIL PROTECTED]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-cvs


_______________________________________________
Pvfs2-developers mailing list
[email protected]
http://www.beowulf-underground.org/mailman/listinfo/pvfs2-developers

Reply via email to