On 01/18/2017 01:07 PM, Parthasarathy Bhuvaragan wrote: > On 01/18/2017 11:06 AM, Ying Xue wrote: >> On 01/16/2017 11:41 PM, Parthasarathy Bhuvaragan wrote: >>> Until now, the subscribers keep track of the subscriptions using >>> reference count at subscriber level. At subscription cancel or >>> subscriber delete, we delete the subscription by checking for >>> pending timer using del_timer(). del_timer() is not SMP safe, if >>> on CPU0 the check for pending timer returns true but CPU1 might >>> schedule the timer callback thereby deleting the subscription. >>> Thus when CPU0 is scheduled, it deletes an invalid subscription. >>> >>> We can fix the above issue using either a synchronous timer function >>> or by introducing subscription reference count. The later solution >>> is preferred as it's consistent with the asynchronous design model >>> used by the rest of the code. >>> >>> In this commit, we introduce subscription refcount to avoid deleting >>> an invalid subscription. >>> >>> Signed-off-by: Parthasarathy Bhuvaragan >>> <parthasarathy.bhuvara...@ericsson.com> >>> --- >>> v2: Address comments from Ying Xue, to avoid race conditions by using >>> refcount only instead of del_timer_sync(). >>> --- >>> net/tipc/subscr.c | 120 >>> +++++++++++++++++++++++++++++------------------------- >>> net/tipc/subscr.h | 1 + >>> 2 files changed, 66 insertions(+), 55 deletions(-) >>> >>> diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c >>> index 0dd02244e21d..d6c1f35f6518 100644 >>> --- a/net/tipc/subscr.c >>> +++ b/net/tipc/subscr.c >>> @@ -54,6 +54,7 @@ struct tipc_subscriber { >>> >>> static void tipc_subscrp_delete(struct tipc_subscription *sub); >>> static void tipc_subscrb_put(struct tipc_subscriber *subscriber); >>> +static void tipc_subscrp_put(struct tipc_subscription *subscription); >>> >>> /** >>> * htohl - convert value to endianness used by destination >>> @@ -137,25 +138,17 @@ void tipc_subscrp_report_overlap(struct >>> tipc_subscription *sub, u32 found_lower, >>> static void tipc_subscrp_timeout(unsigned long data) >>> { >>> struct tipc_subscription *sub = (struct tipc_subscription *)data; >>> - struct tipc_subscriber *subscriber = sub->subscriber; >>> >>> /* Notify subscriber of timeout */ >>> tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, >>> sub->evt.s.seq.upper, >>> TIPC_SUBSCR_TIMEOUT, 0, 0); >>> >>> - spin_lock_bh(&subscriber->lock); >>> - tipc_subscrp_delete(sub); >>> - spin_unlock_bh(&subscriber->lock); >>> - >>> - tipc_subscrb_put(subscriber); >>> + tipc_subscrp_put(sub); >>> } >>> >>> static void tipc_subscrb_kref_release(struct kref *kref) >>> { >>> - struct tipc_subscriber *subcriber = container_of(kref, >>> - struct tipc_subscriber, kref); >>> - >>> - kfree(subcriber); >>> + kfree(container_of(kref,struct tipc_subscriber, kref)); >>> } >>> >>> static void tipc_subscrb_put(struct tipc_subscriber *subscriber) >>> @@ -168,6 +161,56 @@ static void tipc_subscrb_get(struct >>> tipc_subscriber *subscriber) >>> kref_get(&subscriber->kref); >>> } >>> >>> +static void tipc_subscrp_kref_release(struct kref *kref) >>> +{ >>> + struct tipc_subscription *sub = container_of(kref, >>> + struct tipc_subscription, >>> + kref); >>> + struct tipc_net *tn = net_generic(sub->net, tipc_net_id); >>> + struct tipc_subscriber *subscriber = sub->subscriber; >>> + >>> + spin_lock_bh(&subscriber->lock); >>> + tipc_nametbl_unsubscribe(sub); >>> + list_del(&sub->subscrp_list); >>> + atomic_dec(&tn->subscription_count); >>> + spin_unlock_bh(&subscriber->lock); >>> + kfree(sub); >>> + tipc_subscrb_put(subscriber); >>> +} >>> + >>> +static void tipc_subscrp_put(struct tipc_subscription *subscription) >>> +{ >>> + kref_put(&subscription->kref, tipc_subscrp_kref_release); >>> +} >>> + >>> +static void tipc_subscrp_get(struct tipc_subscription *subscription) >>> +{ >>> + kref_get(&subscription->kref); >>> +} >>> + >>> +/* tipc_subscrb_subscrp_delete - delete a specific subscription or all >>> + * subscriptions for a given subscriber. >>> + */ >>> +static void tipc_subscrb_subscrp_delete(struct tipc_subscriber >>> *subscriber, >>> + struct tipc_subscr *s) >>> +{ >>> + struct tipc_subscription *sub, *temp; >>> + >>> + spin_lock_bh(&subscriber->lock); >>> + list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >>> + subscrp_list) { >>> + spin_unlock_bh(&subscriber->lock); >>> + if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { >>> + tipc_subscrp_delete(sub); >>> + return; >>> + } else { >>> + tipc_subscrp_delete(sub); >>> + } >> >> If we change above code as below, it seems more simple: >> >> list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >> subscrp_list) { >> spin_unlock_bh(&subscriber->lock); >> tipc_subscrp_delete(sub); > Ying, > In the case of subscription cancel request, I need to find the correct > subscription and delete only that subscription. If we use the above > version, we will end up deleting all previous subscriptions in the list > for the given subscriber. Ying, My patch was wrong and there itself I delete all prior subscriptions for a given subscriber until the specific subscription. I will spin a new version with the fix.
/Partha > > Thanks for the comments and feedback. > > I found the fix for John's issue and will send a new version soon. > > /Partha >> if (s && !memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) >> return; >> spin_lock_bh(&subscriber->lock); >> } >> >> >> Besides the commit,this version is exactly what I expect. It's very good! >> >> Thanks for the nice job! >> >> Please add my ack-by flag to it. >> >> Regards, >> Ying >> >>> + spin_lock_bh(&subscriber->lock); >>> + } >>> + spin_unlock_bh(&subscriber->lock); >>> +} >>> + >>> static struct tipc_subscriber *tipc_subscrb_create(int conid) >>> { >>> struct tipc_subscriber *subscriber; >>> @@ -177,8 +220,8 @@ static struct tipc_subscriber >>> *tipc_subscrb_create(int conid) >>> pr_warn("Subscriber rejected, no memory\n"); >>> return NULL; >>> } >>> - kref_init(&subscriber->kref); >>> INIT_LIST_HEAD(&subscriber->subscrp_list); >>> + kref_init(&subscriber->kref); >>> subscriber->conid = conid; >>> spin_lock_init(&subscriber->lock); >>> >>> @@ -187,55 +230,21 @@ static struct tipc_subscriber >>> *tipc_subscrb_create(int conid) >>> >>> static void tipc_subscrb_delete(struct tipc_subscriber *subscriber) >>> { >>> - struct tipc_subscription *sub, *temp; >>> - u32 timeout; >>> - >>> - spin_lock_bh(&subscriber->lock); >>> - /* Destroy any existing subscriptions for subscriber */ >>> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >>> - subscrp_list) { >>> - timeout = htohl(sub->evt.s.timeout, sub->swap); >>> - if ((timeout == TIPC_WAIT_FOREVER) || del_timer(&sub->timer)) { >>> - tipc_subscrp_delete(sub); >>> - tipc_subscrb_put(subscriber); >>> - } >>> - } >>> - spin_unlock_bh(&subscriber->lock); >>> - >>> + tipc_subscrb_subscrp_delete(subscriber, NULL); >>> tipc_subscrb_put(subscriber); >>> } >>> >>> static void tipc_subscrp_delete(struct tipc_subscription *sub) >>> { >>> - struct tipc_net *tn = net_generic(sub->net, tipc_net_id); >>> - >>> - tipc_nametbl_unsubscribe(sub); >>> - list_del(&sub->subscrp_list); >>> - kfree(sub); >>> - atomic_dec(&tn->subscription_count); >>> + if (del_timer(&sub->timer)) >>> + tipc_subscrp_put(sub); >>> + tipc_subscrp_put(sub); >>> } >>> >>> static void tipc_subscrp_cancel(struct tipc_subscr *s, >>> struct tipc_subscriber *subscriber) >>> { >>> - struct tipc_subscription *sub, *temp; >>> - u32 timeout; >>> - >>> - spin_lock_bh(&subscriber->lock); >>> - /* Find first matching subscription, exit if not found */ >>> - list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list, >>> - subscrp_list) { >>> - if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { >>> - timeout = htohl(sub->evt.s.timeout, sub->swap); >>> - if ((timeout == TIPC_WAIT_FOREVER) || >>> - del_timer(&sub->timer)) { >>> - tipc_subscrp_delete(sub); >>> - tipc_subscrb_put(subscriber); >>> - } >>> - break; >>> - } >>> - } >>> - spin_unlock_bh(&subscriber->lock); >>> + tipc_subscrb_subscrp_delete(subscriber, s); >>> } >>> >>> static struct tipc_subscription *tipc_subscrp_create(struct net *net, >>> @@ -272,6 +281,7 @@ static struct tipc_subscription >>> *tipc_subscrp_create(struct net *net, >>> sub->swap = swap; >>> memcpy(&sub->evt.s, s, sizeof(*s)); >>> atomic_inc(&tn->subscription_count); >>> + kref_init(&sub->kref); >>> return sub; >>> } >>> >>> @@ -288,17 +298,17 @@ static void tipc_subscrp_subscribe(struct net >>> *net, struct tipc_subscr *s, >>> >>> spin_lock_bh(&subscriber->lock); >>> list_add(&sub->subscrp_list, &subscriber->subscrp_list); >>> - tipc_subscrb_get(subscriber); >>> sub->subscriber = subscriber; >>> tipc_nametbl_subscribe(sub); >>> + tipc_subscrb_get(subscriber); >>> spin_unlock_bh(&subscriber->lock); >>> >>> + tipc_subscrp_get(sub); >>> + setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); >>> timeout = htohl(sub->evt.s.timeout, swap); >>> - if (timeout == TIPC_WAIT_FOREVER) >>> - return; >>> >>> - setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub); >>> - mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); >>> + if (timeout != TIPC_WAIT_FOREVER) >>> + mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); >>> } >>> >>> /* Handle one termination request for the subscriber */ >>> diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h >>> index be60103082c9..ffdc214c117a 100644 >>> --- a/net/tipc/subscr.h >>> +++ b/net/tipc/subscr.h >>> @@ -57,6 +57,7 @@ struct tipc_subscriber; >>> * @evt: template for events generated by subscription >>> */ >>> struct tipc_subscription { >>> + struct kref kref; >>> struct tipc_subscriber *subscriber; >>> struct net *net; >>> struct timer_list timer; >>> >> ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion