Hi Parath,

I am glad to see you to follow my suggestion to solve the issue by 
introducing refcnt tipc_subscription struct. I think this is an only 
appropriate method to perfectly fix issues we met before. In fact we had 
tried several times to resolve kinds of issues happened in this part. 
But unfortunately there are still other side effects introduced or other 
problems remained. In this time, I hope we can drastically repair 
topology server.

As the life cycle of both tipc_subscription and tipc_subscriber objects 
have been managed with refcnt, we should follow the generic principles 
of refcnt usage:
- Rule1: Acquire refcnt before its object is used, and decrease its 
refcnt after its object is released
- Rule2: We can safely touch objects tracked with refnct in kinds of 
different asynchronous contexts.

Therefore, it's unnecessary for us to use del_timer_sync() to 
synchronously delete subscription timer at all. Instead, if you 
asynchronously delete subscription timer, the code will becomes pretty 
simple and easily readable.


On 01/11/2017 08:19 PM, Parthasarathy Bhuvaragan wrote:
> Until now, the subscriptions are deleted at cancel or subscriber
> delete by checking for pending timer using del_timer().
> del_timer() is not SMP safe, if on CPU0 the check for pending timer
> returns true but CPU1 might schedule the timer callback thereby
> deleting the subscription. Thus when CPU0 proceeds to delete the
> invalid subscription.
>
> In this commit, we do the following updates to fix it:
> 1. perform refcount per subscription in addition to subscriber.
> 2. use SMP safe del_timer_sync() to remove pending timers.
>
> Signed-off-by: Parthasarathy Bhuvaragan 
> <parthasarathy.bhuvara...@ericsson.com>
> ---
>  net/tipc/subscr.c | 116 
> ++++++++++++++++++++++++++++++++----------------------
>  net/tipc/subscr.h |   1 +
>  2 files changed, 69 insertions(+), 48 deletions(-)
>
> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
> index 0dd02244e21d..d3f450037ad5 100644
> --- a/net/tipc/subscr.c
> +++ b/net/tipc/subscr.c
> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>
>  static void tipc_subscrp_delete(struct tipc_subscription *sub);
>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>
>  /**
>   * htohl - convert value to endianness used by destination
> @@ -143,19 +144,14 @@ static void tipc_subscrp_timeout(unsigned long data)
>       tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
>                               TIPC_SUBSCR_TIMEOUT, 0, 0);
>
> -     spin_lock_bh(&subscriber->lock);
> -     tipc_subscrp_delete(sub);
> -     spin_unlock_bh(&subscriber->lock);
> -
> +     tipc_nametbl_unsubscribe(sub);
> +     tipc_subscrp_put(sub);
>       tipc_subscrb_put(subscriber);
>  }
>
>  static void tipc_subscrb_kref_release(struct kref *kref)
>  {
> -     struct tipc_subscriber *subcriber = container_of(kref,
> -                                         struct tipc_subscriber, kref);
> -
> -     kfree(subcriber);
> +     kfree(container_of(kref,struct tipc_subscriber, kref));
>  }
>
>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
> @@ -168,6 +164,58 @@ static void tipc_subscrb_get(struct tipc_subscriber 
> *subscriber)
>       kref_get(&subscriber->kref);
>  }
>
> +static void tipc_subscrp_kref_release(struct kref *kref)
> +{
> +     struct tipc_subscription *sub;
> +     struct tipc_net *tn;
> +
> +     sub = container_of(kref, struct tipc_subscription, kref);
> +     tn = net_generic(sub->net, tipc_net_id);
> +
> +     spin_lock_bh(&sub->subscriber->lock);
> +     list_del(&sub->subscrp_list);
> +     atomic_dec(&tn->subscription_count);
> +     spin_unlock_bh(&sub->subscriber->lock);
> +     kfree(sub);
> +}
> +
> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
> +{
> +     kref_put(&subscription->kref, tipc_subscrp_kref_release);
> +}
> +
> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
> +{
> +     kref_get(&subscription->kref);
> +}
> +
> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
> + * subscriptions for a given subscriber.
> + */
> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
> +                                     struct tipc_subscr *s)
> +{
> +     struct tipc_subscription *sub, *temp;
> +
> +     spin_lock_bh(&subscriber->lock);
> +     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> +                              subscrp_list) {
> +             if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
> +                     continue;
> +             /* Delete subscriptions with pending timers */
> +             if (timer_pending(&sub->timer)) {
> +                     tipc_subscrp_get(sub);

If you follow my suggestion, it's unnecessary to temporarily grab sub 
refcnt at all, and here code will be more simpler.

> +                     spin_unlock_bh(&subscriber->lock);
> +                     tipc_subscrp_delete(sub);
> +                     tipc_subscrp_put(sub);
> +                     spin_lock_bh(&subscriber->lock);
> +             }
> +             if (s)
> +                     break;
> +     }
> +     spin_unlock_bh(&subscriber->lock);
> +}
> +
>  static struct tipc_subscriber *tipc_subscrb_create(int conid)
>  {
>       struct tipc_subscriber *subscriber;
> @@ -177,8 +225,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
> conid)
>               pr_warn("Subscriber rejected, no memory\n");
>               return NULL;
>       }
> -     kref_init(&subscriber->kref);
>       INIT_LIST_HEAD(&subscriber->subscrp_list);
> +     kref_init(&subscriber->kref);
>       subscriber->conid = conid;
>       spin_lock_init(&subscriber->lock);
>
> @@ -187,55 +235,26 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
> conid)
>
>  static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
>  {
> -     struct tipc_subscription *sub, *temp;
> -     u32 timeout;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     /* Destroy any existing subscriptions for subscriber */
> -     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> -                              subscrp_list) {
> -             timeout = htohl(sub->evt.s.timeout, sub->swap);
> -             if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
> -                     tipc_subscrp_delete(sub);
> -                     tipc_subscrb_put(subscriber);
> -             }
> -     }
> -     spin_unlock_bh(&subscriber->lock);
> -
> +     tipc_subscrb_subscrp_delete(subscriber, NULL);
>       tipc_subscrb_put(subscriber);
>  }
>
>  static void tipc_subscrp_delete(struct tipc_subscription *sub)
>  {
> -     struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
> +     struct tipc_subscriber *subscriber = sub->subscriber;
> +     u32 timeout = htohl(sub->evt.s.timeout, sub->swap);
>
> -     tipc_nametbl_unsubscribe(sub);
> -     list_del(&sub->subscrp_list);
> -     kfree(sub);
> -     atomic_dec(&tn->subscription_count);
> +     if (timeout == TIPC_WAIT_FOREVER || del_timer_sync(&sub->timer)) {
> +             tipc_nametbl_unsubscribe(sub);
> +             tipc_subscrp_put(sub);
> +             tipc_subscrb_put(subscriber);

I suggest that tipc_nametbl_unsubscribe() can be moved to 
tipc_subscrp_kref_release().

If so, here is very simple:

tipc_subscrp_delete()
{
        del_timer(&sub->timer);
        tipc_subscrp_put(sub);
        tipc_subscrb_put(subscriber);
}

> +     }
>  }
>
>  static void tipc_subscrp_cancel(struct tipc_subscr *s,
>                               struct tipc_subscriber *subscriber)
>  {
> -     struct tipc_subscription *sub, *temp;
> -     u32 timeout;
> -
> -     spin_lock_bh(&subscriber->lock);
> -     /* Find first matching subscription, exit if not found */
> -     list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
> -                              subscrp_list) {
> -             if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
> -                     timeout = htohl(sub->evt.s.timeout, sub->swap);
> -                     if ((timeout == TIPC_WAIT_FOREVER) ||
> -                         del_timer(&sub->timer)) {
> -                             tipc_subscrp_delete(sub);
> -                             tipc_subscrb_put(subscriber);
> -                     }
> -                     break;
> -             }
> -     }
> -     spin_unlock_bh(&subscriber->lock);
> +     tipc_subscrb_subscrp_delete(subscriber, s);
>  }
>
>  static struct tipc_subscription *tipc_subscrp_create(struct net *net,
> @@ -272,6 +291,7 @@ static struct tipc_subscription 
> *tipc_subscrp_create(struct net *net,
>       sub->swap = swap;
>       memcpy(&sub->evt.s, s, sizeof(*s));
>       atomic_inc(&tn->subscription_count);
> +     kref_init(&sub->kref);
>       return sub;
>  }
>
> @@ -288,9 +308,9 @@ static void tipc_subscrp_subscribe(struct net *net, 
> struct tipc_subscr *s,
>
>       spin_lock_bh(&subscriber->lock);
>       list_add(&sub->subscrp_list, &subscriber->subscrp_list);
> -     tipc_subscrb_get(subscriber);
>       sub->subscriber = subscriber;
>       tipc_nametbl_subscribe(sub);
> +     tipc_subscrb_get(subscriber);
>       spin_unlock_bh(&subscriber->lock);
>
>       timeout = htohl(sub->evt.s.timeout, swap);

Before sub->timer is started, we should first grab refcnt of sub.

Moreover, I suggest it's unnecessary to deem TIPC_WAIT_FOREVER as a 
special case. In other words, no matter what timeout of sub is, we 
should first grab sub refcnt and setup its timer. But if timeout is 
TIPC_WAIT_FOREVER, we don't call mod_timer().

This method brings us a benefit that we don't identify whether timer is 
TIPC_WAIT_FOREVER at all when sub is deleted.

> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
> index be60103082c9..ffdc214c117a 100644
> --- a/net/tipc/subscr.h
> +++ b/net/tipc/subscr.h
> @@ -57,6 +57,7 @@ struct tipc_subscriber;
>   * @evt: template for events generated by subscription
>   */
>  struct tipc_subscription {
> +     struct kref kref;
>       struct tipc_subscriber *subscriber;
>       struct net *net;
>       struct timer_list timer;
>

Anyway, if you have question about my suggestions, I can create a patch 
based on your version to show how my idea works.

Thanks,
Ying


------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today. http://sdm.link/xeonphi
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to