This is an automated email from the git hooks/post-receive script.

glondu pushed a commit to branch master
in repository nproc.

commit a0c274c275e009a58f23b79367e66c2fdd377b48
Author: Stephane Glondu <st...@glondu.net>
Date:   Wed Feb 18 14:07:01 2015 +0100

    Import commits from upstream master branch
---
 ...0002-Retry-waitpid-if-it-fails-with-EINTR.patch | 33 ++++++++
 ...x-for-concurrent-access-to-the-input-stre.patch | 57 ++++++++++++++
 ...re-efficient-fix-for-concurrent-access-to.patch | 89 ++++++++++++++++++++++
 debian/patches/series                              |  3 +
 4 files changed, 182 insertions(+)

diff --git a/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch 
b/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch
new file mode 100644
index 0000000..1596537
--- /dev/null
+++ b/debian/patches/0002-Retry-waitpid-if-it-fails-with-EINTR.patch
@@ -0,0 +1,33 @@
+From: Martin Jambon <mar...@mjambon.com>
+Date: Fri, 30 Dec 2011 23:10:27 -0800
+Subject: Retry waitpid if it fails with EINTR
+
+---
+ nproc.ml | 6 +++++-
+ 1 file changed, 5 insertions(+), 1 deletion(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index b68ca1e..1b4a48f 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -149,6 +149,10 @@ struct
+     closed : bool ref;
+   }
+ 
++  let rec waitpid pid =
++    try Unix.waitpid [] pid
++    with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
++
+   (* --master-- *)
+   let pull_task kill_workers in_stream central_service worker =
+     (* Note: input and output file descriptors are automatically closed 
+@@ -266,7 +270,7 @@ struct
+               (try close_worker x with _ -> ());
+               (try
+                  Unix.kill x.worker_pid Sys.sigkill;
+-                 ignore (Unix.waitpid [] x.worker_pid)
++                 ignore (waitpid x.worker_pid)
+                with e ->
+                  !log_error
+                    (sprintf "kill worker %i: %s"
+-- 
diff --git 
a/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
 
b/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
new file mode 100644
index 0000000..bd9d302
--- /dev/null
+++ 
b/debian/patches/0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
@@ -0,0 +1,57 @@
+From: pveber <philippe.ve...@gmail.com>
+Date: Mon, 1 Apr 2013 11:43:07 +0200
+Subject: possible fix for concurrent access to the input stream
+
+in the master process, there are threads 1-to-1 associated with
+workers. They concurrently try to pull a task from a stream
+[in_stream] and send it to a worker, providing a waiter thread for
+delivering the output. However it seems that several worker-associated
+threads can read the same incoming value in the stream, perform the
+computation concurrently and try to send it back to the waiter. Since
+the waiter is woken up several times, this generates the exceptions
+[Invalid_argument("Lwt.wakeup_result")].
+
+The final result is correct, but ressources are wasted, since some
+computation may be several times by several workers (and that really
+happens, since the exceptions are raised quite a few times).
+
+The proposed fix is to add a mutex for the access to [in_stream].
+---
+ nproc.ml | 11 ++++++++---
+ 1 file changed, 8 insertions(+), 3 deletions(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index 1b4a48f..d42ccef 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -153,6 +153,8 @@ struct
+     try Unix.waitpid [] pid
+     with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
+ 
++  let mutex = Lwt_mutex.create ()
++
+   (* --master-- *)
+   let pull_task kill_workers in_stream central_service worker =
+     (* Note: input and output file descriptors are automatically closed 
+@@ -160,14 +162,17 @@ struct
+     let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
+     let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
+     let rec pull () =
+-      Lwt.bind (Lwt_stream.get in_stream) (
+-        function
+-            None -> Lwt.return ()
++      Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
++        Lwt.bind (Lwt_stream.get in_stream) (
++          function
++          | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
+           | Some (f, x, g) ->
++              Lwt_mutex.unlock mutex ;
+               let req = Worker_req (f, x) in
+               Lwt.bind
+                 (write_value oc req)
+                 (read_from_worker g)
++        )
+       )
+     and read_from_worker g () =
+       Lwt.try_bind
+-- 
diff --git 
a/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
 
b/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
new file mode 100644
index 0000000..a17a976
--- /dev/null
+++ 
b/debian/patches/0004-slightly-more-efficient-fix-for-concurrent-access-to.patch
@@ -0,0 +1,89 @@
+From: pveber <philippe.ve...@gmail.com>
+Date: Mon, 1 Apr 2013 12:06:03 +0200
+Subject: slightly more efficient fix for concurrent access to [in_stream]
+
+now each pool has its own mutex.
+---
+ nproc.ml | 34 +++++++++++++++-------------------
+ 1 file changed, 15 insertions(+), 19 deletions(-)
+
+diff --git a/nproc.ml b/nproc.ml
+index d42ccef..62252ce 100644
+--- a/nproc.ml
++++ b/nproc.ml
+@@ -153,26 +153,21 @@ struct
+     try Unix.waitpid [] pid
+     with Unix.Unix_error (Unix.EINTR, _, _) -> waitpid pid
+ 
+-  let mutex = Lwt_mutex.create ()
+-
+   (* --master-- *)
+-  let pull_task kill_workers in_stream central_service worker =
++  let pull_task kill_workers in_stream in_stream_mutex central_service worker 
=
+     (* Note: input and output file descriptors are automatically closed 
+        when the end of the lwt channel is reached. *)
+     let ic = Lwt_io.of_fd ~mode:Lwt_io.input worker.worker_in in
+     let oc = Lwt_io.of_fd ~mode:Lwt_io.output worker.worker_out in
+     let rec pull () =
+-      Lwt.bind (Lwt_mutex.lock mutex) (fun () ->
+-        Lwt.bind (Lwt_stream.get in_stream) (
+-          function
+-          | None -> Lwt_mutex.unlock mutex ; Lwt.return ()
+-          | Some (f, x, g) ->
+-              Lwt_mutex.unlock mutex ;
+-              let req = Worker_req (f, x) in
+-              Lwt.bind
+-                (write_value oc req)
+-                (read_from_worker g)
+-        )
++      Lwt.bind (Lwt_mutex.with_lock in_stream_mutex (fun () -> Lwt_stream.get 
in_stream)) (
++        function
++        | None -> Lwt.return ()
++        | Some (f, x, g) ->
++            let req = Worker_req (f, x) in
++            Lwt.bind
++              (write_value oc req)
++              (read_from_worker g)
+       )
+     and read_from_worker g () =
+       Lwt.try_bind
+@@ -219,7 +214,7 @@ struct
+     pull ()
+ 
+   (* --master-- *)
+-  let create_gen init (in_stream, push) nproc central_service worker_data =
++  let create_gen init ((in_stream, push), in_stream_mutex) nproc 
central_service worker_data =
+     let proc_pool = Array.make nproc None in
+     Array.iteri (
+       fun i _ ->
+@@ -286,7 +281,7 @@ struct
+     let jobs =
+       Lwt.join 
+         (List.map
+-           (pull_task kill_workers in_stream central_service)
++           (pull_task kill_workers in_stream in_stream_mutex central_service)
+            worker_info)
+     in
+ 
+@@ -315,7 +310,7 @@ struct
+   let default_init worker_info = ()
+ 
+   let create ?(init = default_init) nproc central_service worker_data =
+-    create_gen init (Lwt_stream.create ()) nproc central_service worker_data
++    create_gen init (Lwt_stream.create (), Lwt_mutex.create ()) nproc 
central_service worker_data
+ 
+   let close p =
+     p.close ()
+@@ -402,8 +397,9 @@ struct
+       in
+       let p, t =
+         create_gen init 
+-          (task_stream,
+-           (fun _ -> assert false) (* push *))
++          ((task_stream,
++            (fun _ -> assert false) (* push *)),
++           Lwt_mutex.create ())
+           nproc serv env
+       in
+       try
+-- 
diff --git a/debian/patches/series b/debian/patches/series
index 4c90dad..64629b0 100644
--- a/debian/patches/series
+++ b/debian/patches/series
@@ -1 +1,4 @@
 0001-Do-not-install-redundant-files.patch
+0002-Retry-waitpid-if-it-fails-with-EINTR.patch
+0003-possible-fix-for-concurrent-access-to-the-input-stre.patch
+0004-slightly-more-efficient-fix-for-concurrent-access-to.patch

-- 
Alioth's /usr/local/bin/git-commit-notice on 
/srv/git.debian.org/git/pkg-ocaml-maint/packages/nproc.git

_______________________________________________
Pkg-ocaml-maint-commits mailing list
Pkg-ocaml-maint-commits@lists.alioth.debian.org
http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/pkg-ocaml-maint-commits

Reply via email to