On 01/15/2017 12:23 PM, Ying Xue wrote:
> Hi Parath,
>
> I am glad to see you to follow my suggestion to solve the issue by
> introducing refcnt tipc_subscription struct. I think this is an only
> appropriate method to perfectly fix issues we met before. In fact we had
> tried several times to resolve kinds of issues happened in this part.
> But unfortunately there are still other side effects introduced or other
> problems remained. In this time, I hope we can drastically repair
> topology server.
>
> As the life cycle of both tipc_subscription and tipc_subscriber objects
> have been managed with refcnt, we should follow the generic principles
> of refcnt usage:
> - Rule1: Acquire refcnt before its object is used, and decrease its
> refcnt after its object is released
> - Rule2: We can safely touch objects tracked with refnct in kinds of
> different asynchronous contexts.
>
> Therefore, it's unnecessary for us to use del_timer_sync() to
> synchronously delete subscription timer at all. Instead, if you
> asynchronously delete subscription timer, the code will becomes pretty
> simple and easily readable.
>
Hi Ying,

Thanks for the valuable comments. I will post a patch with your comments 
soon, the code looks much cleaner with your approach.

/Partha
>
> On 01/11/2017 08:19 PM, Parthasarathy Bhuvaragan wrote:
>> Until now, the subscriptions are deleted at cancel or subscriber
>> delete by checking for pending timer using del_timer().
>> del_timer() is not SMP safe, if on CPU0 the check for pending timer
>> returns true but CPU1 might schedule the timer callback thereby
>> deleting the subscription. Thus when CPU0 proceeds to delete the
>> invalid subscription.
>>
>> In this commit, we do the following updates to fix it:
>> 1. perform refcount per subscription in addition to subscriber.
>> 2. use SMP safe del_timer_sync() to remove pending timers.
>>
>> Signed-off-by: Parthasarathy Bhuvaragan
>> <parthasarathy.bhuvara...@ericsson.com>
>> ---
>>  net/tipc/subscr.c | 116
>> ++++++++++++++++++++++++++++++++----------------------
>>  net/tipc/subscr.h |   1 +
>>  2 files changed, 69 insertions(+), 48 deletions(-)
>>
>> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
>> index 0dd02244e21d..d3f450037ad5 100644
>> --- a/net/tipc/subscr.c
>> +++ b/net/tipc/subscr.c
>> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>>
>>  static void tipc_subscrp_delete(struct tipc_subscription *sub);
>>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
>> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>>
>>  /**
>>   * htohl - convert value to endianness used by destination
>> @@ -143,19 +144,14 @@ static void tipc_subscrp_timeout(unsigned long
>> data)
>>      tipc_subscrp_send_event(sub, sub->evt.s.seq.lower,
>> sub->evt.s.seq.upper,
>>                  TIPC_SUBSCR_TIMEOUT, 0, 0);
>>
>> -    spin_lock_bh(&subscriber->lock);
>> -    tipc_subscrp_delete(sub);
>> -    spin_unlock_bh(&subscriber->lock);
>> -
>> +    tipc_nametbl_unsubscribe(sub);
>> +    tipc_subscrp_put(sub);
>>      tipc_subscrb_put(subscriber);
>>  }
>>
>>  static void tipc_subscrb_kref_release(struct kref *kref)
>>  {
>> -    struct tipc_subscriber *subcriber = container_of(kref,
>> -                        struct tipc_subscriber, kref);
>> -
>> -    kfree(subcriber);
>> +    kfree(container_of(kref,struct tipc_subscriber, kref));
>>  }
>>
>>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
>> @@ -168,6 +164,58 @@ static void tipc_subscrb_get(struct
>> tipc_subscriber *subscriber)
>>      kref_get(&subscriber->kref);
>>  }
>>
>> +static void tipc_subscrp_kref_release(struct kref *kref)
>> +{
>> +    struct tipc_subscription *sub;
>> +    struct tipc_net *tn;
>> +
>> +    sub = container_of(kref, struct tipc_subscription, kref);
>> +    tn = net_generic(sub->net, tipc_net_id);
>> +
>> +    spin_lock_bh(&sub->subscriber->lock);
>> +    list_del(&sub->subscrp_list);
>> +    atomic_dec(&tn->subscription_count);
>> +    spin_unlock_bh(&sub->subscriber->lock);
>> +    kfree(sub);
>> +}
>> +
>> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
>> +{
>> +    kref_put(&subscription->kref, tipc_subscrp_kref_release);
>> +}
>> +
>> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
>> +{
>> +    kref_get(&subscription->kref);
>> +}
>> +
>> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
>> + * subscriptions for a given subscriber.
>> + */
>> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber
>> *subscriber,
>> +                    struct tipc_subscr *s)
>> +{
>> +    struct tipc_subscription *sub, *temp;
>> +
>> +    spin_lock_bh(&subscriber->lock);
>> +    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>> +                 subscrp_list) {
>> +        if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
>> +            continue;
>> +        /* Delete subscriptions with pending timers */
>> +        if (timer_pending(&sub->timer)) {
>> +            tipc_subscrp_get(sub);
>
> If you follow my suggestion, it's unnecessary to temporarily grab sub
> refcnt at all, and here code will be more simpler.
>
>> +            spin_unlock_bh(&subscriber->lock);
>> +            tipc_subscrp_delete(sub);
>> +            tipc_subscrp_put(sub);
>> +            spin_lock_bh(&subscriber->lock);
>> +        }
>> +        if (s)
>> +            break;
>> +    }
>> +    spin_unlock_bh(&subscriber->lock);
>> +}
>> +
>>  static struct tipc_subscriber *tipc_subscrb_create(int conid)
>>  {
>>      struct tipc_subscriber *subscriber;
>> @@ -177,8 +225,8 @@ static struct tipc_subscriber
>> *tipc_subscrb_create(int conid)
>>          pr_warn("Subscriber rejected, no memory\n");
>>          return NULL;
>>      }
>> -    kref_init(&subscriber->kref);
>>      INIT_LIST_HEAD(&subscriber->subscrp_list);
>> +    kref_init(&subscriber->kref);
>>      subscriber->conid = conid;
>>      spin_lock_init(&subscriber->lock);
>>
>> @@ -187,55 +235,26 @@ static struct tipc_subscriber
>> *tipc_subscrb_create(int conid)
>>
>>  static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
>>  {
>> -    struct tipc_subscription *sub, *temp;
>> -    u32 timeout;
>> -
>> -    spin_lock_bh(&subscriber->lock);
>> -    /* Destroy any existing subscriptions for subscriber */
>> -    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>> -                 subscrp_list) {
>> -        timeout = htohl(sub->evt.s.timeout, sub->swap);
>> -        if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
>> -            tipc_subscrp_delete(sub);
>> -            tipc_subscrb_put(subscriber);
>> -        }
>> -    }
>> -    spin_unlock_bh(&subscriber->lock);
>> -
>> +    tipc_subscrb_subscrp_delete(subscriber, NULL);
>>      tipc_subscrb_put(subscriber);
>>  }
>>
>>  static void tipc_subscrp_delete(struct tipc_subscription *sub)
>>  {
>> -    struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>> +    struct tipc_subscriber *subscriber = sub->subscriber;
>> +    u32 timeout = htohl(sub->evt.s.timeout, sub->swap);
>>
>> -    tipc_nametbl_unsubscribe(sub);
>> -    list_del(&sub->subscrp_list);
>> -    kfree(sub);
>> -    atomic_dec(&tn->subscription_count);
>> +    if (timeout == TIPC_WAIT_FOREVER || del_timer_sync(&sub->timer)) {
>> +        tipc_nametbl_unsubscribe(sub);
>> +        tipc_subscrp_put(sub);
>> +        tipc_subscrb_put(subscriber);
>
> I suggest that tipc_nametbl_unsubscribe() can be moved to
> tipc_subscrp_kref_release().
>
> If so, here is very simple:
>
> tipc_subscrp_delete()
> {
>     del_timer(&sub->timer);
>     tipc_subscrp_put(sub);
>     tipc_subscrb_put(subscriber);
> }
>
>> +    }
>>  }
>>
>>  static void tipc_subscrp_cancel(struct tipc_subscr *s,
>>                  struct tipc_subscriber *subscriber)
>>  {
>> -    struct tipc_subscription *sub, *temp;
>> -    u32 timeout;
>> -
>> -    spin_lock_bh(&subscriber->lock);
>> -    /* Find first matching subscription, exit if not found */
>> -    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>> -                 subscrp_list) {
>> -        if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
>> -            timeout = htohl(sub->evt.s.timeout, sub->swap);
>> -            if ((timeout == TIPC_WAIT_FOREVER) ||
>> -                del_timer(&sub->timer)) {
>> -                tipc_subscrp_delete(sub);
>> -                tipc_subscrb_put(subscriber);
>> -            }
>> -            break;
>> -        }
>> -    }
>> -    spin_unlock_bh(&subscriber->lock);
>> +    tipc_subscrb_subscrp_delete(subscriber, s);
>>  }
>>
>>  static struct tipc_subscription *tipc_subscrp_create(struct net *net,
>> @@ -272,6 +291,7 @@ static struct tipc_subscription
>> *tipc_subscrp_create(struct net *net,
>>      sub->swap = swap;
>>      memcpy(&sub->evt.s, s, sizeof(*s));
>>      atomic_inc(&tn->subscription_count);
>> +    kref_init(&sub->kref);
>>      return sub;
>>  }
>>
>> @@ -288,9 +308,9 @@ static void tipc_subscrp_subscribe(struct net
>> *net, struct tipc_subscr *s,
>>
>>      spin_lock_bh(&subscriber->lock);
>>      list_add(&sub->subscrp_list, &subscriber->subscrp_list);
>> -    tipc_subscrb_get(subscriber);
>>      sub->subscriber = subscriber;
>>      tipc_nametbl_subscribe(sub);
>> +    tipc_subscrb_get(subscriber);
>>      spin_unlock_bh(&subscriber->lock);
>>
>>      timeout = htohl(sub->evt.s.timeout, swap);
>
> Before sub->timer is started, we should first grab refcnt of sub.
>
> Moreover, I suggest it's unnecessary to deem TIPC_WAIT_FOREVER as a
> special case. In other words, no matter what timeout of sub is, we
> should first grab sub refcnt and setup its timer. But if timeout is
> TIPC_WAIT_FOREVER, we don't call mod_timer().
>
> This method brings us a benefit that we don't identify whether timer is
> TIPC_WAIT_FOREVER at all when sub is deleted.
>
>> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
>> index be60103082c9..ffdc214c117a 100644
>> --- a/net/tipc/subscr.h
>> +++ b/net/tipc/subscr.h
>> @@ -57,6 +57,7 @@ struct tipc_subscriber;
>>   * @evt: template for events generated by subscription
>>   */
>>  struct tipc_subscription {
>> +    struct kref kref;
>>      struct tipc_subscriber *subscriber;
>>      struct net *net;
>>      struct timer_list timer;
>>
>
> Anyway, if you have question about my suggestions, I can create a patch
> based on your version to show how my idea works.
>
> Thanks,
> Ying
>

------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today. http://sdm.link/xeonphi
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to