On 01/18/2017 01:07 PM, Parthasarathy Bhuvaragan wrote:
> On 01/18/2017 11:06 AM, Ying Xue wrote:
>> On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote:
>>> Until now, the subscribers keep track of the subscriptions using
>>> reference count at subscriber level. At subscription cancel or
>>> subscriber delete, we delete the subscription by checking for
>>> pending timer using del_timer(). del_timer() is not SMP safe, if
>>> on CPU0 the check for pending timer returns true but CPU1 might
>>> schedule the timer callback thereby deleting the subscription.
>>> Thus when CPU0 is scheduled, it deletes an invalid subscription.
>>>
>>> We can fix the above issue using either a synchronous timer function
>>> or by introducing subscription reference count. The later solution
>>> is preferred as it's consistent with the asynchronous design model
>>> used by the rest of the code.
>>>
>>> In this commit, we introduce subscription refcount to avoid deleting
>>> an invalid subscription.
>>>
>>> Signed-off-by: Parthasarathy Bhuvaragan
>>> <parthasarathy.bhuvara...@ericsson.com>
>>> ---
>>> v2: Address comments from Ying Xue, to avoid race conditions by using
>>>     refcount only instead of del_timer_sync().
>>> ---
>>>  net/tipc/subscr.c | 120
>>> +++++++++++++++++++++++++++++-------------------------
>>>  net/tipc/subscr.h |   1 +
>>>  2 files changed, 66 insertions(+), 55 deletions(-)
>>>
>>> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
>>> index 0dd02244e21d..d6c1f35f6518 100644
>>> --- a/net/tipc/subscr.c
>>> +++ b/net/tipc/subscr.c
>>> @@ -54,6 +54,7 @@ struct tipc_subscriber {
>>>
>>>  static void tipc_subscrp_delete(struct tipc_subscription *sub);
>>>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
>>> +static void tipc_subscrp_put(struct tipc_subscription *subscription);
>>>
>>>  /**
>>>   * htohl - convert value to endianness used by destination
>>> @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct
>>> tipc_subscription *sub, u32 found_lower,
>>>  static void tipc_subscrp_timeout(unsigned long data)
>>>  {
>>>      struct tipc_subscription *sub = (struct tipc_subscription *)data;
>>> -    struct tipc_subscriber *subscriber = sub->subscriber;
>>>
>>>      /* Notify subscriber of timeout */
>>>      tipc_subscrp_send_event(sub, sub->evt.s.seq.lower,
>>> sub->evt.s.seq.upper,
>>>                  TIPC_SUBSCR_TIMEOUT, 0, 0);
>>>
>>> -    spin_lock_bh(&subscriber->lock);
>>> -    tipc_subscrp_delete(sub);
>>> -    spin_unlock_bh(&subscriber->lock);
>>> -
>>> -    tipc_subscrb_put(subscriber);
>>> +    tipc_subscrp_put(sub);
>>>  }
>>>
>>>  static void tipc_subscrb_kref_release(struct kref *kref)
>>>  {
>>> -    struct tipc_subscriber *subcriber = container_of(kref,
>>> -                        struct tipc_subscriber, kref);
>>> -
>>> -    kfree(subcriber);
>>> +    kfree(container_of(kref,struct tipc_subscriber, kref));
>>>  }
>>>
>>>  static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
>>> @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct
>>> tipc_subscriber *subscriber)
>>>      kref_get(&subscriber->kref);
>>>  }
>>>
>>> +static void tipc_subscrp_kref_release(struct kref *kref)
>>> +{
>>> +    struct tipc_subscription *sub = container_of(kref,
>>> +                             struct tipc_subscription,
>>> +                             kref);
>>> +    struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>>> +    struct tipc_subscriber *subscriber = sub->subscriber;
>>> +
>>> +    spin_lock_bh(&subscriber->lock);
>>> +    tipc_nametbl_unsubscribe(sub);
>>> +    list_del(&sub->subscrp_list);
>>> +    atomic_dec(&tn->subscription_count);
>>> +    spin_unlock_bh(&subscriber->lock);
>>> +    kfree(sub);
>>> +    tipc_subscrb_put(subscriber);
>>> +}
>>> +
>>> +static void tipc_subscrp_put(struct tipc_subscription *subscription)
>>> +{
>>> +    kref_put(&subscription->kref, tipc_subscrp_kref_release);
>>> +}
>>> +
>>> +static void tipc_subscrp_get(struct tipc_subscription *subscription)
>>> +{
>>> +    kref_get(&subscription->kref);
>>> +}
>>> +
>>> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
>>> + * subscriptions for a given subscriber.
>>> + */
>>> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber
>>> *subscriber,
>>> +                    struct tipc_subscr *s)
>>> +{
>>> +    struct tipc_subscription *sub, *temp;
>>> +
>>> +    spin_lock_bh(&subscriber->lock);
>>> +    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> +                 subscrp_list) {
>>> +        spin_unlock_bh(&subscriber->lock);
>>> +        if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
>>> +            tipc_subscrp_delete(sub);
>>> +            return;
>>> +        } else {
>>> +            tipc_subscrp_delete(sub);
>>> +        }
>>
>> If we change above code as below, it seems more simple:
>>
>> list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>> subscrp_list) {
>>     spin_unlock_bh(&subscriber->lock);
>>     tipc_subscrp_delete(sub);
> Ying,
> In the case of subscription cancel request, I need to find the correct
> subscription and delete only that subscription. If we use the above
> version, we will end up deleting all previous subscriptions in the list
> for the given subscriber.
Ying,
My patch was wrong and there itself I delete all prior subscriptions
for a given subscriber until the specific subscription.
I will spin a new version with the fix.

/Partha
>
> Thanks for the comments and feedback.
>
> I found the fix for John's issue and will send a new version soon.
>
> /Partha
>>     if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
>>         return;
>>     spin_lock_bh(&subscriber->lock);
>> }
>>
>>
>> Besides the commit,this version is exactly what I expect. It's very good!
>>
>> Thanks for the nice job!
>>
>> Please add my ack-by flag to it.
>>
>> Regards,
>> Ying
>>
>>> +        spin_lock_bh(&subscriber->lock);
>>> +    }
>>> +    spin_unlock_bh(&subscriber->lock);
>>> +}
>>> +
>>>  static struct tipc_subscriber *tipc_subscrb_create(int conid)
>>>  {
>>>      struct tipc_subscriber *subscriber;
>>> @@ -177,8 +220,8 @@ static struct tipc_subscriber
>>> *tipc_subscrb_create(int conid)
>>>          pr_warn("Subscriber rejected, no memory\n");
>>>          return NULL;
>>>      }
>>> -    kref_init(&subscriber->kref);
>>>      INIT_LIST_HEAD(&subscriber->subscrp_list);
>>> +    kref_init(&subscriber->kref);
>>>      subscriber->conid = conid;
>>>      spin_lock_init(&subscriber->lock);
>>>
>>> @@ -187,55 +230,21 @@ static struct tipc_subscriber
>>> *tipc_subscrb_create(int conid)
>>>
>>>  static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
>>>  {
>>> -    struct tipc_subscription *sub, *temp;
>>> -    u32 timeout;
>>> -
>>> -    spin_lock_bh(&subscriber->lock);
>>> -    /* Destroy any existing subscriptions for subscriber */
>>> -    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> -                 subscrp_list) {
>>> -        timeout = htohl(sub->evt.s.timeout, sub->swap);
>>> -        if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
>>> -            tipc_subscrp_delete(sub);
>>> -            tipc_subscrb_put(subscriber);
>>> -        }
>>> -    }
>>> -    spin_unlock_bh(&subscriber->lock);
>>> -
>>> +    tipc_subscrb_subscrp_delete(subscriber, NULL);
>>>      tipc_subscrb_put(subscriber);
>>>  }
>>>
>>>  static void tipc_subscrp_delete(struct tipc_subscription *sub)
>>>  {
>>> -    struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
>>> -
>>> -    tipc_nametbl_unsubscribe(sub);
>>> -    list_del(&sub->subscrp_list);
>>> -    kfree(sub);
>>> -    atomic_dec(&tn->subscription_count);
>>> +    if (del_timer(&sub->timer))
>>> +        tipc_subscrp_put(sub);
>>> +    tipc_subscrp_put(sub);
>>>  }
>>>
>>>  static void tipc_subscrp_cancel(struct tipc_subscr *s,
>>>                  struct tipc_subscriber *subscriber)
>>>  {
>>> -    struct tipc_subscription *sub, *temp;
>>> -    u32 timeout;
>>> -
>>> -    spin_lock_bh(&subscriber->lock);
>>> -    /* Find first matching subscription, exit if not found */
>>> -    list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
>>> -                 subscrp_list) {
>>> -        if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
>>> -            timeout = htohl(sub->evt.s.timeout, sub->swap);
>>> -            if ((timeout == TIPC_WAIT_FOREVER) ||
>>> -                del_timer(&sub->timer)) {
>>> -                tipc_subscrp_delete(sub);
>>> -                tipc_subscrb_put(subscriber);
>>> -            }
>>> -            break;
>>> -        }
>>> -    }
>>> -    spin_unlock_bh(&subscriber->lock);
>>> +    tipc_subscrb_subscrp_delete(subscriber, s);
>>>  }
>>>
>>>  static struct tipc_subscription *tipc_subscrp_create(struct net *net,
>>> @@ -272,6 +281,7 @@ static struct tipc_subscription
>>> *tipc_subscrp_create(struct net *net,
>>>      sub->swap = swap;
>>>      memcpy(&sub->evt.s, s, sizeof(*s));
>>>      atomic_inc(&tn->subscription_count);
>>> +    kref_init(&sub->kref);
>>>      return sub;
>>>  }
>>>
>>> @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net
>>> *net, struct tipc_subscr *s,
>>>
>>>      spin_lock_bh(&subscriber->lock);
>>>      list_add(&sub->subscrp_list, &subscriber->subscrp_list);
>>> -    tipc_subscrb_get(subscriber);
>>>      sub->subscriber = subscriber;
>>>      tipc_nametbl_subscribe(sub);
>>> +    tipc_subscrb_get(subscriber);
>>>      spin_unlock_bh(&subscriber->lock);
>>>
>>> +    tipc_subscrp_get(sub);
>>> +    setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
>>>      timeout = htohl(sub->evt.s.timeout, swap);
>>> -    if (timeout == TIPC_WAIT_FOREVER)
>>> -        return;
>>>
>>> -    setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
>>> -    mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
>>> +    if (timeout != TIPC_WAIT_FOREVER)
>>> +        mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
>>>  }
>>>
>>>  /* Handle one termination request for the subscriber */
>>> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
>>> index be60103082c9..ffdc214c117a 100644
>>> --- a/net/tipc/subscr.h
>>> +++ b/net/tipc/subscr.h
>>> @@ -57,6 +57,7 @@ struct tipc_subscriber;
>>>   * @evt: template for events generated by subscription
>>>   */
>>>  struct tipc_subscription {
>>> +    struct kref kref;
>>>      struct tipc_subscriber *subscriber;
>>>      struct net *net;
>>>      struct timer_list timer;
>>>
>>

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to