----- Original Message -----
> From: "Keir Fraser" <[email protected]>
> To: [email protected]
> Cc: "Mathieu Desnoyers" <[email protected]>, "Paul E. McKenney" 
> <[email protected]>
> Sent: Friday, April 18, 2014 4:12:49 PM
> Subject: [PATCH liburcu] Fix lifetime of rcu_barrier()'s completion structure
> 
> [Another attachment, I must sort out to integrate with git send-mail]
> 
> rcu_barrier() can return as soon as completion.barrier_count==0, which
> frees the completion struct along with the rest of its stack frame. But
> its call_rcu callbacks may yet try to read and write completion.futex
> via the wake_up function.
> 
> Fix this by calloc()ing the completion struct and implementing a
> reference count to determine when it is eventually free()d.
> 
> This also fixes bug #787, since calloc() initialises all fields of the
> structure to zero.

I slightly edited your patch to use urcu_ref() and uatomic_sub_return()
(simple style fix). Please let me know if you are OK with the attached
patch.

Thanks!

Mathieu

-- 
Mathieu Desnoyers
EfficiOS Inc.
http://www.efficios.com
commit 66de1f4e8e636e3ff326b4447b72c82ca78cf98a
Author: Keir Fraser <[email protected]>
Date:   Sat Apr 19 15:59:01 2014 -0400

    Fix: Do not free the rcu_barrier() completion struct until all threads are done with it
    
    It cannot reside on the waiter's stack as rcu_barrier() may return
    before the call_rcu handlers have finished checking whether it needs a
    futex wakeup. Instead we dynamically allocate the structure and
    determine its lifetime with a reference count.
    
    Signed-off-by: Keir Fraser <[email protected]>
    [ Edit by Mathieu Desnoyers: use urcu/ref.h. Cleanup: use
      uatomic_sub_return() rather than uatomic_add_return() with negative
      value. ]
    Signed-off-by: Mathieu Desnoyers <[email protected]>

diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
index f0751f5..a55ac08 100644
--- a/urcu-call-rcu-impl.h
+++ b/urcu-call-rcu-impl.h
@@ -42,6 +42,7 @@
 #include "urcu/list.h"
 #include "urcu/futex.h"
 #include "urcu/tls-compat.h"
+#include "urcu/ref.h"
 #include "urcu-die.h"
 
 /* Data structure that identifies a call_rcu thread. */
@@ -67,6 +68,7 @@ struct call_rcu_data {
 struct call_rcu_completion {
 	int barrier_count;
 	int32_t futex;
+	struct urcu_ref ref;
 };
 
 struct call_rcu_completion_work {
@@ -769,6 +771,15 @@ void free_all_cpu_call_rcu_data(void)
 }
 
 static
+void free_completion(struct urcu_ref *ref)
+{
+	struct call_rcu_completion *completion;
+
+	completion = caa_container_of(ref, struct call_rcu_completion, ref);
+	free(completion);
+}
+
+static
 void _rcu_barrier_complete(struct rcu_head *head)
 {
 	struct call_rcu_completion_work *work;
@@ -776,8 +787,9 @@ void _rcu_barrier_complete(struct rcu_head *head)
 
 	work = caa_container_of(head, struct call_rcu_completion_work, head);
 	completion = work->completion;
-	uatomic_dec(&completion->barrier_count);
-	call_rcu_completion_wake_up(completion);
+	if (!uatomic_sub_return(&completion->barrier_count, 1))
+		call_rcu_completion_wake_up(completion);
+	urcu_ref_put(&completion->ref, free_completion);
 	free(work);
 }
 
@@ -787,7 +799,7 @@ void _rcu_barrier_complete(struct rcu_head *head)
 void rcu_barrier(void)
 {
 	struct call_rcu_data *crdp;
-	struct call_rcu_completion completion;
+	struct call_rcu_completion *completion;
 	int count = 0;
 	int was_online;
 
@@ -809,12 +821,17 @@ void rcu_barrier(void)
 		goto online;
 	}
 
+	completion = calloc(sizeof(*completion), 1);
+	if (!completion)
+		urcu_die(errno);
+
 	call_rcu_lock(&call_rcu_mutex);
 	cds_list_for_each_entry(crdp, &call_rcu_data_list, list)
 		count++;
 
-	completion.barrier_count = count;
-	completion.futex = 0;
+	/* Referenced by rcu_barrier() and each call_rcu thread. */
+	urcu_ref_set(&completion->ref, count + 1);
+	completion->barrier_count = count;
 
 	cds_list_for_each_entry(crdp, &call_rcu_data_list, list) {
 		struct call_rcu_completion_work *work;
@@ -822,20 +839,23 @@ void rcu_barrier(void)
 		work = calloc(sizeof(*work), 1);
 		if (!work)
 			urcu_die(errno);
-		work->completion = &completion;
+		work->completion = completion;
 		_call_rcu(&work->head, _rcu_barrier_complete, crdp);
 	}
 	call_rcu_unlock(&call_rcu_mutex);
 
 	/* Wait for them */
 	for (;;) {
-		uatomic_dec(&completion.futex);
+		uatomic_dec(&completion->futex);
 		/* Decrement futex before reading barrier_count */
 		cmm_smp_mb();
-		if (!uatomic_read(&completion.barrier_count))
+		if (!uatomic_read(&completion->barrier_count))
 			break;
-		call_rcu_completion_wait(&completion);
+		call_rcu_completion_wait(completion);
 	}
+
+	urcu_ref_put(&completion->ref, free_completion);
+
 online:
 	if (was_online)
 		rcu_thread_online();
_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to