On 01/15/2017 12:23 PM, Ying Xue wrote: > Hi Parath, > > I am glad to see you to follow my suggestion to solve the issue by > introducing refcnt tipc_subscription struct. I think this is an only > appropriate method to perfectly fix issues we met before. In fact we had > tried several times to resolve kinds of issues happened in this part. > But unfortunately there are still other side effects introduced or other > problems remained. In this time, I hope we can drastically repair > topology server. > > As the life cycle of both tipc_subscription and tipc_subscriber objects > have been managed with refcnt, we should follow the generic principles > of refcnt usage: > - Rule1: Acquire refcnt before its object is used, and decrease its > refcnt after its object is released > - Rule2: We can safely touch objects tracked with refnct in kinds of > different asynchronous contexts. > > Therefore, it's unnecessary for us to use del_timer_sync() to > synchronously delete subscription timer at all. Instead, if you > asynchronously delete subscription timer, the code will becomes pretty > simple and easily readable. > Hi Ying,
Thanks for the valuable comments. I will post a patch with your comments soon, the code looks much cleaner with your approach. /Partha > > On 01/11/2017 08:19 PM, Parthasarathy Bhuvaragan wrote: >> Until now, the subscriptions are deleted at cancel or subscriber >> delete by checking for pending timer using del_timer(). >> del_timer() is not SMP safe, if on CPU0 the check for pending timer >> returns true but CPU1 might schedule the timer callback thereby >> deleting the subscription. Thus when CPU0 proceeds to delete the >> invalid subscription. >> >> In this commit, we do the following updates to fix it: >> 1. perform refcount per subscription in addition to subscriber. >> 2. use SMP safe del_timer_sync() to remove pending timers. >> >> Signed-off-by: Parthasarathy Bhuvaragan >> <parthasarathy.bhuvara...@ericsson.com> >> --- >> net/tipc/subscr.c | 116 >> ++++++++++++++++++++++++++++++++---------------------- >> net/tipc/subscr.h | 1 + >> 2 files changed, 69 insertions(+), 48 deletions(-) >> >> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c >> index 0dd02244e21d..d3f450037ad5 100644 >> --- a/net/tipc/subscr.c >> +++ b/net/tipc/subscr.c >> @@ -54,6 +54,7 @@ struct tipc_subscriber { >> >> static void tipc_subscrp_delete(struct tipc_subscription *sub); >> static void tipc_subscrb_put(struct tipc_subscriber *subscriber); >> +static void tipc_subscrp_put(struct tipc_subscription *subscription); >> >> /** >> * htohl - convert value to endianness used by destination >> @@ -143,19 +144,14 @@ static void tipc_subscrp_timeout(unsigned long >> data) >> tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, >> sub->evt.s.seq.upper, >> TIPC_SUBSCR_TIMEOUT, 0, 0); >> >> - spin_lock_bh(&subscriber->lock); >> - tipc_subscrp_delete(sub); >> - spin_unlock_bh(&subscriber->lock); >> - >> + tipc_nametbl_unsubscribe(sub); >> + tipc_subscrp_put(sub); >> tipc_subscrb_put(subscriber); >> } >> >> static void tipc_subscrb_kref_release(struct kref *kref) >> { >> - struct tipc_subscriber *subcriber = container_of(kref, >> - struct tipc_subscriber, kref); >> - >> - kfree(subcriber); >> + kfree(container_of(kref,struct tipc_subscriber, kref)); >> } >> >> static void tipc_subscrb_put(struct tipc_subscriber *subscriber) >> @@ -168,6 +164,58 @@ static void tipc_subscrb_get(struct >> tipc_subscriber *subscriber) >> kref_get(&subscriber->kref); >> } >> >> +static void tipc_subscrp_kref_release(struct kref *kref) >> +{ >> + struct tipc_subscription *sub; >> + struct tipc_net *tn; >> + >> + sub = container_of(kref, struct tipc_subscription, kref); >> + tn = net_generic(sub->net, tipc_net_id); >> + >> + spin_lock_bh(&sub->subscriber->lock); >> + list_del(&sub->subscrp_list); >> + atomic_dec(&tn->subscription_count); >> + spin_unlock_bh(&sub->subscriber->lock); >> + kfree(sub); >> +} >> + >> +static void tipc_subscrp_put(struct tipc_subscription *subscription) >> +{ >> + kref_put(&subscription->kref, tipc_subscrp_kref_release); >> +} >> + >> +static void tipc_subscrp_get(struct tipc_subscription *subscription) >> +{ >> + kref_get(&subscription->kref); >> +} >> + >> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all >> + * subscriptions for a given subscriber. >> + */ >> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber >> *subscriber, >> + struct tipc_subscr *s) >> +{ >> + struct tipc_subscription *sub, *temp; >> + >> + spin_lock_bh(&subscriber->lock); >> + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >> + subscrp_list) { >> + if (s && memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) >> + continue; >> + /* Delete subscriptions with pending timers */ >> + if (timer_pending(&sub->timer)) { >> + tipc_subscrp_get(sub); > > If you follow my suggestion, it's unnecessary to temporarily grab sub > refcnt at all, and here code will be more simpler. > >> + spin_unlock_bh(&subscriber->lock); >> + tipc_subscrp_delete(sub); >> + tipc_subscrp_put(sub); >> + spin_lock_bh(&subscriber->lock); >> + } >> + if (s) >> + break; >> + } >> + spin_unlock_bh(&subscriber->lock); >> +} >> + >> static struct tipc_subscriber *tipc_subscrb_create(int conid) >> { >> struct tipc_subscriber *subscriber; >> @@ -177,8 +225,8 @@ static struct tipc_subscriber >> *tipc_subscrb_create(int conid) >> pr_warn("Subscriber rejected, no memory\n"); >> return NULL; >> } >> - kref_init(&subscriber->kref); >> INIT_LIST_HEAD(&subscriber->subscrp_list); >> + kref_init(&subscriber->kref); >> subscriber->conid = conid; >> spin_lock_init(&subscriber->lock); >> >> @@ -187,55 +235,26 @@ static struct tipc_subscriber >> *tipc_subscrb_create(int conid) >> >> static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) >> { >> - struct tipc_subscription *sub, *temp; >> - u32 timeout; >> - >> - spin_lock_bh(&subscriber->lock); >> - /* Destroy any existing subscriptions for subscriber */ >> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >> - subscrp_list) { >> - timeout = htohl(sub->evt.s.timeout, sub->swap); >> - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) { >> - tipc_subscrp_delete(sub); >> - tipc_subscrb_put(subscriber); >> - } >> - } >> - spin_unlock_bh(&subscriber->lock); >> - >> + tipc_subscrb_subscrp_delete(subscriber, NULL); >> tipc_subscrb_put(subscriber); >> } >> >> static void tipc_subscrp_delete(struct tipc_subscription *sub) >> { >> - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); >> + struct tipc_subscriber *subscriber = sub->subscriber; >> + u32 timeout = htohl(sub->evt.s.timeout, sub->swap); >> >> - tipc_nametbl_unsubscribe(sub); >> - list_del(&sub->subscrp_list); >> - kfree(sub); >> - atomic_dec(&tn->subscription_count); >> + if (timeout == TIPC_WAIT_FOREVER || del_timer_sync(&sub->timer)) { >> + tipc_nametbl_unsubscribe(sub); >> + tipc_subscrp_put(sub); >> + tipc_subscrb_put(subscriber); > > I suggest that tipc_nametbl_unsubscribe() can be moved to > tipc_subscrp_kref_release(). > > If so, here is very simple: > > tipc_subscrp_delete() > { > del_timer(&sub->timer); > tipc_subscrp_put(sub); > tipc_subscrb_put(subscriber); > } > >> + } >> } >> >> static void tipc_subscrp_cancel(struct tipc_subscr *s, >> struct tipc_subscriber *subscriber) >> { >> - struct tipc_subscription *sub, *temp; >> - u32 timeout; >> - >> - spin_lock_bh(&subscriber->lock); >> - /* Find first matching subscription, exit if not found */ >> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >> - subscrp_list) { >> - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { >> - timeout = htohl(sub->evt.s.timeout, sub->swap); >> - if ((timeout == TIPC_WAIT_FOREVER) || >> - del_timer(&sub->timer)) { >> - tipc_subscrp_delete(sub); >> - tipc_subscrb_put(subscriber); >> - } >> - break; >> - } >> - } >> - spin_unlock_bh(&subscriber->lock); >> + tipc_subscrb_subscrp_delete(subscriber, s); >> } >> >> static struct tipc_subscription *tipc_subscrp_create(struct net *net, >> @@ -272,6 +291,7 @@ static struct tipc_subscription >> *tipc_subscrp_create(struct net *net, >> sub->swap = swap; >> memcpy(&sub->evt.s, s, sizeof(*s)); >> atomic_inc(&tn->subscription_count); >> + kref_init(&sub->kref); >> return sub; >> } >> >> @@ -288,9 +308,9 @@ static void tipc_subscrp_subscribe(struct net >> *net, struct tipc_subscr *s, >> >> spin_lock_bh(&subscriber->lock); >> list_add(&sub->subscrp_list, &subscriber->subscrp_list); >> - tipc_subscrb_get(subscriber); >> sub->subscriber = subscriber; >> tipc_nametbl_subscribe(sub); >> + tipc_subscrb_get(subscriber); >> spin_unlock_bh(&subscriber->lock); >> >> timeout = htohl(sub->evt.s.timeout, swap); > > Before sub->timer is started, we should first grab refcnt of sub. > > Moreover, I suggest it's unnecessary to deem TIPC_WAIT_FOREVER as a > special case. In other words, no matter what timeout of sub is, we > should first grab sub refcnt and setup its timer. But if timeout is > TIPC_WAIT_FOREVER, we don't call mod_timer(). > > This method brings us a benefit that we don't identify whether timer is > TIPC_WAIT_FOREVER at all when sub is deleted. > >> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h >> index be60103082c9..ffdc214c117a 100644 >> --- a/net/tipc/subscr.h >> +++ b/net/tipc/subscr.h >> @@ -57,6 +57,7 @@ struct tipc_subscriber; >> * @evt: template for events generated by subscription >> */ >> struct tipc_subscription { >> + struct kref kref; >> struct tipc_subscriber *subscriber; >> struct net *net; >> struct timer_list timer; >> > > Anyway, if you have question about my suggestions, I can create a patch > based on your version to show how my idea works. > > Thanks, > Ying > ------------------------------------------------------------------------------ Developer Access Program for Intel Xeon Phi Processors Access to Intel Xeon Phi processor-based developer platforms. With one year of Intel Parallel Studio XE. Training and support from Colfax. Order your platform today. http://sdm.link/xeonphi _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion