Hi Gang,

At present time, when AST or BAST sending is failed, it already prints
ERROR logs.

I admit that it's truly a corner case but a fatal error when networks is
not reliable.

Because if AST is not sent back to locking node, related procedure will
be pending, even the whole cluster will hang.

I think it is not permitted in enterprise-level usage scenario.

Thangks

Changwei



On 2017/8/7 15:43, Gang He wrote:
> Base on your description, this case should be a corner case, NOT a fatal 
> error.
> Should we use mlog(ML_NOTICE, ...) to print these logs?
>
> Thanks
> Gang
>
>
>> Hi,
>>
>> In current code, while flushing AST, we don't handle an exception that
>> sending AST or BAST is failed.
>> But it is indeed possible that AST or BAST is lost due to some kind of
>> networks fault.
>>
>> If above exception happens, the requesting node will never obtain an AST
>> back, hence, it will never acquire the lock or abort current locking.
>>
>> With this patch, I'd like to fix this issue by re-queuing the AST or
>> BAST if sending is failed due to networks fault.
>>
>> And the re-queuing AST or BAST will be dropped if the requesting node is
>> dead!
>>
>> It will improve the reliability a lot.
>>
>>
>> Thanks.
>>
>> Changwei.
>>
>> Signed-off-by: Changwei Ge <ge.chang...@h3c.com>
>> ---
>>  fs/ocfs2/dlm/dlmrecovery.c |   51
>> ++++++++++++++++++++++++++++++++++++++++++--
>>  fs/ocfs2/dlm/dlmthread.c   |   39 +++++++++++++++++++++++++++------
>>  2 files changed, 81 insertions(+), 9 deletions(-)
>>
>> diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
>> index 74407c6..ddfaf74 100644
>> --- a/fs/ocfs2/dlm/dlmrecovery.c
>> +++ b/fs/ocfs2/dlm/dlmrecovery.c
>> @@ -2263,11 +2263,45 @@ static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
>>      }
>>  }
>>  
>> +static int dlm_drop_pending_ast_bast(struct dlm_ctxt *dlm,
>> +                     struct dlm_lock *lock)
>> +{
>> +    int reserved = 0;
>> +
>> +    spin_lock(&dlm->ast_lock);
>> +    if (!list_empty(&lock->ast_list)) {
>> +        mlog(0, "%s: drop pending AST for lock(cookie=%u:%llu).\n",
>> +             dlm->name,
>> +             dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
>> +             dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
>> +        list_del_init(&lock->ast_list);
>> +        lock->ast_pending = 0;
>> +        dlm_lock_put(lock);
>> +        reserved++;
>> +    }
>> +
>> +    if (!list_empty(&lock->bast_list)) {
>> +        mlog(0, "%s: drop pending BAST for lock(cookie=%u:%llu).\n",
>> +             dlm->name,
>> +             dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
>> +             dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
>> +        list_del_init(&lock->bast_list);
>> +        lock->bast_pending = 0;
>> +        dlm_lock_put(lock);
>> +        reserved++;
>> +    }
>> +    spin_unlock(&dlm->ast_lock);
>> +
>> +    return reserved;
>> +}
>> +
>>  static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>> -                struct dlm_lock_resource *res, u8 dead_node)
>> +                struct dlm_lock_resource *res, u8 dead_node,
>> +                int *reserved)
>>  {
>>      struct dlm_lock *lock, *next;
>>      unsigned int freed = 0;
>> +    int reserved_tmp = 0;
>>  
>>      /* this node is the lockres master:
>>       * 1) remove any stale locks for the dead node
>> @@ -2284,6 +2318,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>>          if (lock->ml.node == dead_node) {
>>              list_del_init(&lock->list);
>>              dlm_lock_put(lock);
>> +
>> +            reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
>> +
>>              /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
>>              dlm_lock_put(lock);
>>              freed++;
>> @@ -2293,6 +2330,9 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>>          if (lock->ml.node == dead_node) {
>>              list_del_init(&lock->list);
>>              dlm_lock_put(lock);
>> +
>> +            reserved_tmp += dlm_drop_pending_ast_bast(dlm, lock);
>> +
>>              /* Can't schedule DLM_UNLOCK_FREE_LOCK - do manually */
>>              dlm_lock_put(lock);
>>              freed++;
>> @@ -2308,6 +2348,8 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
>>          }
>>      }
>>  
>> +    *reserved = reserved_tmp;
>> +
>>      if (freed) {
>>          mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
>>               "dropping ref from lockres\n", dlm->name,
>> @@ -2367,6 +2409,7 @@ static void dlm_do_local_recovery_cleanup(struct
>> dlm_ctxt *dlm, u8 dead_node)
>>      for (i = 0; i < DLM_HASH_BUCKETS; i++) {
>>          bucket = dlm_lockres_hash(dlm, i);
>>          hlist_for_each_entry_safe(res, tmp, bucket, hash_node) {
>> +            int reserved = 0;
>>               /* always prune any $RECOVERY entries for dead nodes,
>>                * otherwise hangs can occur during later recovery */
>>              if (dlm_is_recovery_lock(res->lockname.name,
>> @@ -2420,7 +2463,7 @@ static void dlm_do_local_recovery_cleanup(struct
>> dlm_ctxt *dlm, u8 dead_node)
>>                      continue;
>>                  }
>>              } else if (res->owner == dlm->node_num) {
>> -                dlm_free_dead_locks(dlm, res, dead_node);
>> +                dlm_free_dead_locks(dlm, res, dead_node, &reserved);
>>                  __dlm_lockres_calc_usage(dlm, res);
>>              } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
>>                  if (test_bit(dead_node, res->refmap)) {
>> @@ -2432,6 +2475,10 @@ static void dlm_do_local_recovery_cleanup(struct
>> dlm_ctxt *dlm, u8 dead_node)
>>                  }
>>              }
>>              spin_unlock(&res->spinlock);
>> +            while (reserved) {
>> +                dlm_lockres_release_ast(dlm, res);
>> +                reserved--;
>> +            }
>>          }
>>      }
>>  
>> diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
>> index 838a06d..c34a619 100644
>> --- a/fs/ocfs2/dlm/dlmthread.c
>> +++ b/fs/ocfs2/dlm/dlmthread.c
>> @@ -587,13 +587,13 @@ static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
>>  
>>  static void dlm_flush_asts(struct dlm_ctxt *dlm)
>>  {
>> -    int ret;
>> +    int ret = 0;
>>      struct dlm_lock *lock;
>>      struct dlm_lock_resource *res;
>>      u8 hi;
>>  
>>      spin_lock(&dlm->ast_lock);
>> -    while (!list_empty(&dlm->pending_asts)) {
>> +    while (!list_empty(&dlm->pending_asts) && !ret) {
>>          lock = list_entry(dlm->pending_asts.next,
>>                    struct dlm_lock, ast_list);
>>          /* get an extra ref on lock */
>> @@ -628,8 +628,20 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>>              mlog(0, "%s: res %.*s, AST queued while flushing last "
>>                   "one\n", dlm->name, res->lockname.len,
>>                   res->lockname.name);
>> -        } else
>> -            lock->ast_pending = 0;
>> +        } else {
>> +            if (unlikely(ret < 0)) {
>> +                /* If this AST is not sent back successfully,
>> +                 * there is no chance that the second lock
>> +                 * request comes.
>> +                 */
>> +                spin_lock(&res->spinlock);
>> +                __dlm_lockres_reserve_ast(res);
>> +                spin_unlock(&res->spinlock);
>> +                __dlm_queue_ast(dlm, lock);
>> +            } else {
>> +                lock->ast_pending = 0;
>> +            }
>> +        }
>>  
>>          /* drop the extra ref.
>>           * this may drop it completely. */
>> @@ -637,7 +649,9 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>>          dlm_lockres_release_ast(dlm, res);
>>      }
>>  
>> -    while (!list_empty(&dlm->pending_basts)) {
>> +    ret = 0;
>> +
>> +    while (!list_empty(&dlm->pending_basts) && !ret) {
>>          lock = list_entry(dlm->pending_basts.next,
>>                    struct dlm_lock, bast_list);
>>          /* get an extra ref on lock */
>> @@ -650,7 +664,6 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>>          spin_lock(&lock->spinlock);
>>          BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
>>          hi = lock->ml.highest_blocked;
>> -        lock->ml.highest_blocked = LKM_IVMODE;
>>          spin_unlock(&lock->spinlock);
>>  
>>          /* remove from list (including ref) */
>> @@ -681,7 +694,19 @@ static void dlm_flush_asts(struct dlm_ctxt *dlm)
>>                   "one\n", dlm->name, res->lockname.len,
>>                   res->lockname.name);
>>          } else
>> -            lock->bast_pending = 0;
>> +            if (unlikely(ret)) {
>> +                spin_lock(&res->spinlock);
>> +                __dlm_lockres_reserve_ast(res);
>> +                spin_unlock(&res->spinlock);
>> +                __dlm_queue_bast(dlm, lock);
>> +            } else {
>> +                lock->bast_pending = 0;
>> +                /* Set ::highest_blocked to invalid after
>> +                 * sending BAST successfully so that
>> +                 * no more BAST would be queued.
>> +                 */
>> +                lock->ml.highest_blocked = LKM_IVMODE;
>> +            }
>>  
>>          /* drop the extra ref.
>>           * this may drop it completely. */
>> -- 
>> 1.7.9.5
>>
>>
>> _______________________________________________
>> Ocfs2-devel mailing list
>> Ocfs2-devel@oss.oracle.com 
>> https://oss.oracle.com/mailman/listinfo/ocfs2-devel


_______________________________________________
Ocfs2-devel mailing list
Ocfs2-devel@oss.oracle.com
https://oss.oracle.com/mailman/listinfo/ocfs2-devel

Reply via email to