Until now, the subscriptions are deleted at cancel or subscriber
delete by checking for pending timer using del_timer().
del_timer() is not SMP safe, if on CPU0 the check for pending timer
returns true but CPU1 might schedule the timer callback thereby
deleting the subscription. Thus when CPU0 proceeds to delete the
invalid subscription.

In this commit, we do the following updates to fix it:
1. perform refcount per subscription in addition to subscriber.
2. use SMP safe del_timer_sync() to remove pending timers.

Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvara...@ericsson.com>
---
 net/tipc/subscr.c | 116 ++++++++++++++++++++++++++++++++----------------------
 net/tipc/subscr.h |   1 +
 2 files changed, 69 insertions(+), 48 deletions(-)

diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 0dd02244e21d..d3f450037ad5 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -54,6 +54,7 @@ struct tipc_subscriber {
 
 static void tipc_subscrp_delete(struct tipc_subscription *sub);
 static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
+static void tipc_subscrp_put(struct tipc_subscription *subscription);
 
 /**
  * htohl - convert value to endianness used by destination
@@ -143,19 +144,14 @@ static void tipc_subscrp_timeout(unsigned long data)
        tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
                                TIPC_SUBSCR_TIMEOUT, 0, 0);
 
-       spin_lock_bh(&subscriber->lock);
-       tipc_subscrp_delete(sub);
-       spin_unlock_bh(&subscriber->lock);
-
+       tipc_nametbl_unsubscribe(sub);
+       tipc_subscrp_put(sub);
        tipc_subscrb_put(subscriber);
 }
 
 static void tipc_subscrb_kref_release(struct kref *kref)
 {
-       struct tipc_subscriber *subcriber = container_of(kref,
-                                           struct tipc_subscriber, kref);
-
-       kfree(subcriber);
+       kfree(container_of(kref,struct tipc_subscriber, kref));
 }
 
 static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
@@ -168,6 +164,58 @@ static void tipc_subscrb_get(struct tipc_subscriber 
*subscriber)
        kref_get(&subscriber->kref);
 }
 
+static void tipc_subscrp_kref_release(struct kref *kref)
+{
+       struct tipc_subscription *sub;
+       struct tipc_net *tn;
+
+       sub = container_of(kref, struct tipc_subscription, kref);
+       tn = net_generic(sub->net, tipc_net_id);
+
+       spin_lock_bh(&sub->subscriber->lock);
+       list_del(&sub->subscrp_list);
+       atomic_dec(&tn->subscription_count);
+       spin_unlock_bh(&sub->subscriber->lock);
+       kfree(sub);
+}
+
+static void tipc_subscrp_put(struct tipc_subscription *subscription)
+{
+       kref_put(&subscription->kref, tipc_subscrp_kref_release);
+}
+
+static void tipc_subscrp_get(struct tipc_subscription *subscription)
+{
+       kref_get(&subscription->kref);
+}
+
+/* tipc_subscrb_subscrp_delete - delete a specific subscription or all
+ * subscriptions for a given subscriber.
+ */
+static void tipc_subscrb_subscrp_delete(struct tipc_subscriber *subscriber,
+                                       struct tipc_subscr *s)
+{
+       struct tipc_subscription *sub, *temp;
+
+       spin_lock_bh(&subscriber->lock);
+       list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
+                                subscrp_list) {
+               if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr)))
+                       continue;
+               /* Delete subscriptions with pending timers */
+               if (timer_pending(&sub->timer)) {
+                       tipc_subscrp_get(sub);
+                       spin_unlock_bh(&subscriber->lock);
+                       tipc_subscrp_delete(sub);
+                       tipc_subscrp_put(sub);
+                       spin_lock_bh(&subscriber->lock);
+               }
+               if (s)
+                       break;
+       }
+       spin_unlock_bh(&subscriber->lock);
+}
+
 static struct tipc_subscriber *tipc_subscrb_create(int conid)
 {
        struct tipc_subscriber *subscriber;
@@ -177,8 +225,8 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
conid)
                pr_warn("Subscriber rejected, no memory\n");
                return NULL;
        }
-       kref_init(&subscriber->kref);
        INIT_LIST_HEAD(&subscriber->subscrp_list);
+       kref_init(&subscriber->kref);
        subscriber->conid = conid;
        spin_lock_init(&subscriber->lock);
 
@@ -187,55 +235,26 @@ static struct tipc_subscriber *tipc_subscrb_create(int 
conid)
 
 static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
 {
-       struct tipc_subscription *sub, *temp;
-       u32 timeout;
-
-       spin_lock_bh(&subscriber->lock);
-       /* Destroy any existing subscriptions for subscriber */
-       list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
-                                subscrp_list) {
-               timeout = htohl(sub->evt.s.timeout, sub->swap);
-               if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) {
-                       tipc_subscrp_delete(sub);
-                       tipc_subscrb_put(subscriber);
-               }
-       }
-       spin_unlock_bh(&subscriber->lock);
-
+       tipc_subscrb_subscrp_delete(subscriber, NULL);
        tipc_subscrb_put(subscriber);
 }
 
 static void tipc_subscrp_delete(struct tipc_subscription *sub)
 {
-       struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
+       struct tipc_subscriber *subscriber = sub->subscriber;
+       u32 timeout = htohl(sub->evt.s.timeout, sub->swap);
 
-       tipc_nametbl_unsubscribe(sub);
-       list_del(&sub->subscrp_list);
-       kfree(sub);
-       atomic_dec(&tn->subscription_count);
+       if (timeout == TIPC_WAIT_FOREVER || del_timer_sync(&sub->timer)) {
+               tipc_nametbl_unsubscribe(sub);
+               tipc_subscrp_put(sub);
+               tipc_subscrb_put(subscriber);
+       }
 }
 
 static void tipc_subscrp_cancel(struct tipc_subscr *s,
                                struct tipc_subscriber *subscriber)
 {
-       struct tipc_subscription *sub, *temp;
-       u32 timeout;
-
-       spin_lock_bh(&subscriber->lock);
-       /* Find first matching subscription, exit if not found */
-       list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
-                                subscrp_list) {
-               if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
-                       timeout = htohl(sub->evt.s.timeout, sub->swap);
-                       if ((timeout == TIPC_WAIT_FOREVER) ||
-                           del_timer(&sub->timer)) {
-                               tipc_subscrp_delete(sub);
-                               tipc_subscrb_put(subscriber);
-                       }
-                       break;
-               }
-       }
-       spin_unlock_bh(&subscriber->lock);
+       tipc_subscrb_subscrp_delete(subscriber, s);
 }
 
 static struct tipc_subscription *tipc_subscrp_create(struct net *net,
@@ -272,6 +291,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct 
net *net,
        sub->swap = swap;
        memcpy(&sub->evt.s, s, sizeof(*s));
        atomic_inc(&tn->subscription_count);
+       kref_init(&sub->kref);
        return sub;
 }
 
@@ -288,9 +308,9 @@ static void tipc_subscrp_subscribe(struct net *net, struct 
tipc_subscr *s,
 
        spin_lock_bh(&subscriber->lock);
        list_add(&sub->subscrp_list, &subscriber->subscrp_list);
-       tipc_subscrb_get(subscriber);
        sub->subscriber = subscriber;
        tipc_nametbl_subscribe(sub);
+       tipc_subscrb_get(subscriber);
        spin_unlock_bh(&subscriber->lock);
 
        timeout = htohl(sub->evt.s.timeout, swap);
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index be60103082c9..ffdc214c117a 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -57,6 +57,7 @@ struct tipc_subscriber;
  * @evt: template for events generated by subscription
  */
 struct tipc_subscription {
+       struct kref kref;
        struct tipc_subscriber *subscriber;
        struct net *net;
        struct timer_list timer;
-- 
2.1.4


------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today. http://sdm.link/xeonphi
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to