This is an automated email from the git hooks/post-receive script. glondu pushed a commit to branch master in repository parmap.
commit 218dd85ad9cfba746b9a604a69d13e07b5780642 Author: Stephane Glondu <st...@glondu.net> Date: Sat Jul 22 12:25:31 2017 +0200 New upstream version 1.0~rc8 --- ChangeLog | 9 -- Changelog | 20 ++- Makefile.in | 2 +- README.maintainer | 11 ++ myocamlbuild.ml | 4 +- opam | 28 ++++ parmap.ml | 398 ++++++++++++++++++++++++---------------------- parmap.mldylib | 1 + parmap.mli | 9 ++ parmap.mllib | 1 + setcore.mli => setcore.ml | 0 11 files changed, 273 insertions(+), 210 deletions(-) diff --git a/ChangeLog b/ChangeLog deleted file mode 100644 index 654864e..0000000 --- a/ChangeLog +++ /dev/null @@ -1,9 +0,0 @@ -Version 0.9.8 contains the following major new features w.r.t. 0.9.4 - - a chunksize parameter can be used to control the granularity of the - parallelism: each worker will handle a series of chunks of this size - and ask for them when ready, thus allowing the system to achieve - automatic load balancing - - very specialised versions of the map function are now available for - arrays and float arrays, allowing to obtain significant speed-up even - on relatively light computations - - autoconf and ocamlbuild harness should simplify compilation and installation diff --git a/Changelog b/Changelog index 1d0aac9..c85ac01 100644 --- a/Changelog +++ b/Changelog @@ -1,4 +1,16 @@ -2011/08/30 (RDC): internally convert lists to array to avoid quadratic penalty in execution time on long lists, - thanks to Paul Vernaza <pvern...@andrew.cmu.edu> for pointing out this issu; - added 'a sequence type to allow using efficiently the library both with lists and arrays. - +2011/11/30 (RDC) + Version 0.9.8 contains the following major new features w.r.t. 0.9.4 + - a chunksize parameter can be used to control the granularity of the + parallelism: each worker will handle a series of chunks of this size + and ask for them when ready, thus allowing the system to achieve + automatic load balancing + - very specialised versions of the map function are now available for + arrays and float arrays, allowing to obtain significant speed-up even + on relatively light computations + - autoconf and ocamlbuild harness should simplify compilation and installation. + + +2011/08/30 (RDC) + internally convert lists to array to avoid quadratic penalty in execution time on long lists, + thanks to Paul Vernaza <pvern...@andrew.cmu.edu> for pointing out this issue; + added 'a sequence type to allow using efficiently the library both with lists and arrays. diff --git a/Makefile.in b/Makefile.in index 6c0ee0b..827a072 100644 --- a/Makefile.in +++ b/Makefile.in @@ -78,7 +78,7 @@ examples: INSTALL_STUFF = META -INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmxa _build/*.cmxs) +INSTALL_STUFF += $(wildcard _build/*.cma _build/*.cmx _build/*.cmxa _build/*.cmxs) INSTALL_STUFF += $(filter-out $(wildcard _build/myocamlbuild.*),$(wildcard _build/*.mli _build/*.cmi)) INSTALL_STUFF += $(wildcard _build/*.so _build/*.a) diff --git a/README.maintainer b/README.maintainer new file mode 100644 index 0000000..9cb1458 --- /dev/null +++ b/README.maintainer @@ -0,0 +1,11 @@ +Notice to maintainers and contributors. + +Dependencies +------------ + +The list of modules that are packed into parmap.cm{,x}a is declared in parmap.mllib + +The list of modules that are packed into parmap.cmxs is stated in parmap.mldylib + +These informations are used by ocamlbuild to create the libraries, please keep +them up to date even in case an alternative build system is used. diff --git a/myocamlbuild.ml b/myocamlbuild.ml index 9654ea2..b847cf9 100644 --- a/myocamlbuild.ml +++ b/myocamlbuild.ml @@ -7,9 +7,9 @@ let _ = dispatch begin function flag ["compile"; "c"] & S[ A"-ccopt"; A"-D_GNU_SOURCE"; A"-ccopt"; A"-fPIC" ]; flag ["link"; "library"; "ocaml"; "byte"; "use_libparmap"] & - S[A"-dllib"; A"-lparmap_stubs";]; + S[A"-dllib"; A"-lparmap_stubs"; ]; flag ["link"; "library"; "ocaml"; "native"; "use_libparmap"] & - S[A"-cclib"; A"-lparmap_stubs"]; + S[A"-cclib"; A"-lparmap_stubs"; ]; dep ["link"; "ocaml"; "use_libparmap"] ["libparmap_stubs.a"]; flag ["link"; "ocaml"; "link_libparmap"] (A"libparmap_stubs.a"); diff --git a/opam b/opam new file mode 100644 index 0000000..69ddbc9 --- /dev/null +++ b/opam @@ -0,0 +1,28 @@ +opam-version: "1.2" +maintainer: "Roberto Di Cosmo <robe...@dicosmo.org>" +authors: "Roberto Di Cosmo <robe...@dicosmo.org>" +homepage: "https://github.com/rdicosmo/parmap" +dev-repo: "https://github.com/rdicosmo/parmap.git" +bug-reports: "https://github.com/rdicosmo/parmap/issues" +build: [ + ["aclocal" "-I" "m4"] + ["autoconf"] + ["autoheader"] + ["./configure"] + [make "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib" ] +] +install: [ + [make "install" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"] +] +remove: [ + ["aclocal" "-I" "m4"] + ["autoconf"] + ["autoheader"] + ["./configure"] + [make "uninstall" "DESTDIR=%{prefix}%" "OCAMLLIBDIR=lib"] +] +depends: [ + "ocamlfind" + "ocamlbuild" {build} + "conf-autoconf" +] diff --git a/parmap.ml b/parmap.ml index 74f3669..a8da654 100644 --- a/parmap.ml +++ b/parmap.ml @@ -30,6 +30,19 @@ let default_ncores=ref (max 2 (Setcore.numcores()-1));; let set_default_ncores n = default_ncores := n;; let get_default_ncores () = !default_ncores;; +let ncores = ref 0;; + +let set_ncores n = ncores := n;; +let get_ncores () = !ncores + +(* worker process rank *) + +let masters_rank = -1 +let rank = ref masters_rank + +let set_rank n = rank := n +let get_rank () = !rank + (* exception handling code *) let handle_exc core msg = @@ -42,10 +55,14 @@ let can_redirect path = try Unix.mkdir path 0o777; true with Unix.Unix_error(e,_s,_s') -> - (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding without \ - stdout/stderr redirection\n%!" - (Unix.getpid ()) path (Unix.error_message e)); - false + (* another job may have created it between the check and the mkdir *) + if e == Unix.EEXIST then true + else begin + (Printf.eprintf "[Pid %d]: Error creating %s : %s; proceeding \ + without stdout/stderr redirection\n%!" + (Unix.getpid ()) path (Unix.error_message e)); + false + end else true let log_debug fmt = @@ -113,100 +130,113 @@ let marshal fd v = let s = Marshal.to_string v [Marshal.Closures] in ignore(Bytearray.mmap_of_string fd s) +(* Exit the program with calling [at_exit] handlers *) +external sys_exit : int -> 'a = "caml_sys_exit" + +let spawn_many n ~in_subprocess = + let rec loop i acc = + if i = n then + acc + else + match Unix.fork() with + 0 -> + (* [at_exit] handlers are called in reverse order of registration. + By registering a handler that exits prematurely, we prevent the + execution of handlers registered before the fork. + + This ignores the exit code provided by the user, but we ignore + it anyway in [wait_for_pids]. + *) + at_exit (fun () -> sys_exit 0); + set_rank i; + in_subprocess i; + exit 0 + | -1 -> + Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; + loop (i + 1) acc + | pid -> + loop (i + 1) (pid :: acc) + in + (* call the GC before forking *) + Gc.compact (); + loop 0 [] + +let wait_for_pids pids = + let rec wait_for_pid pid = + try ignore(Unix.waitpid [] pid : int * Unix.process_status) + with + | Unix.Unix_error (Unix.ECHILD, _, _) -> () + | Unix.Unix_error (Unix.EINTR, _, _) -> wait_for_pid pid + in + List.iter wait_for_pid pids + +let run_many n ~in_subprocess = + wait_for_pids (spawn_many n ~in_subprocess) + (* a simple mapper function that computes 1/nth of the data on each of the n cores in one iteration *) -let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores compute opid al collect = +let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores' compute opid al collect = (* flush everything *) flush_all(); (* init task parameters *) let ln = Array.length al in - let ncores = min ln (max 1 ncores) in - let chunksize = max 1 (ln/ncores) in + set_ncores (min ln (max 1 ncores')); + let chunksize = max 1 (ln / !ncores) in log_debug "simplemapper on %d elements, on %d cores, chunksize = %d%!" - ln ncores chunksize; + ln !ncores chunksize; (* create descriptors to mmap *) - let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let lo=i*chunksize in - let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a total of \ - %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; - exit 1 - in - let v = compute al lo hi opid exc_handler in - marshal fdarr.(i) v; - exit 0 - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; - (* wait for all children *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done; + let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in + (* run children *) + run_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let lo=i*chunksize in + let hi=if i = !ncores - 1 then ln - 1 else (i + 1) * chunksize - 1 in + let exc_handler e j = (* handle an exception at index j *) + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a total of \ + %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; + exit 1 + in + let v = compute al lo hi opid exc_handler in + marshal fdarr.(i) v); (* read in all data *) let res = ref [] in (* iterate in reverse order, to accumulate in the right order *) - for i = 0 to ncores-1 do - res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res; + for i = 0 to !ncores - 1 do + res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res; done; (* collect all results *) collect !res (* a simple iteration function that iterates on 1/nth of the data on each of the n cores *) -let simpleiter init finalize ncores compute al = +let simpleiter init finalize ncores' compute al = (* flush everything *) flush_all(); (* init task parameters *) let ln = Array.length al in - let ncores = min ln (max 1 ncores) in - let chunksize = max 1 (ln/ncores) in + set_ncores (min ln (max 1 ncores')); + let chunksize = max 1 (ln / !ncores) in log_debug "simplemapper on %d elements, on %d cores, chunksize = %d%!" - ln ncores chunksize; - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let lo=i*chunksize in - let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a total of \ - %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; - exit 1 - in - compute al lo hi exc_handler; - exit 0 - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; - (* wait for all children *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done + ln !ncores chunksize; + (* run children *) + run_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let lo=i*chunksize in + let hi=if i= !ncores - 1 then ln-1 else (i+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a total of \ + %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; + exit 1 + in + compute al lo hi exc_handler); (* return with no value *) (* a more sophisticated mapper function, with automatic load balancing *) @@ -230,24 +260,24 @@ let setup_children_chans oc pipedown ?fdarr i = let finish () = (log_debug "shutting down (pid=%d)\n%!" pid; try close_in ic; close_out oc with _ -> () - ); + ); exit 0 in receive, signal, return, finish, pid (* parametric mapper primitive that captures the parallel structure *) -let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute opid al collect = +let mapper (init:int -> unit) (finalize:unit -> unit) ncores' ~chunksize compute opid al collect = let ln = Array.length al in if ln=0 then (collect []) else begin - let ncores = min ln (max 1 ncores) in - log_debug "mapper on %d elements, on %d cores%!" ln ncores; + set_ncores (min ln (max 1 ncores')); + log_debug "mapper on %d elements, on %d cores%!" ln !ncores; match chunksize with None -> (* no need of load balancing *) - simplemapper init finalize ncores compute opid al collect - | Some v when ncores >= ln/v -> + simplemapper init finalize !ncores compute opid al collect + | Some v when !ncores >= ln/v -> (* no need of load balancing if more cores than tasks *) - simplemapper init finalize ncores compute opid al collect + simplemapper init finalize !ncores compute opid al collect | Some v -> (* init task parameters : ntasks > 0 here, as otherwise ncores >= 1 >= ln/v = ntasks and we would take @@ -256,57 +286,50 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute (* flush everything *) flush_all (); (* create descriptors to mmap *) - let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in + let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in (* setup communication channel with the workers *) - let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in + let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in let pipeup_rd,pipeup_wr=Unix.pipe () in let oc_up = Unix.out_channel_of_descr pipeup_wr in - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let d=Unix.gettimeofday() in - (* primitives for communication *) - Unix.close pipeup_rd; - let receive,signal,return,finish,pid = - setup_children_chans oc_up pipedown ~fdarr i in - let reschunk=ref opid in - let computetask n = (* compute chunk number n *) - let lo=n*chunksize in - let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - begin - let errmsg = Printexc.to_string e in - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a \ - total of %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) errmsg i; - signal (Error (i,errmsg)); - finish() - end - in - reschunk:= compute al lo hi !reschunk exc_handler; - log_debug - "worker on core %d (pid=%d), segment (%d,%d) of data of \ - length %d, chunksize=%d finished in %f seconds" - i pid lo hi ln chunksize (Unix.gettimeofday() -. d) - in - while true do - (* ask for work until we are finished *) - signal (Ready i); - match receive() with - | Finished -> return (!reschunk:'d); finish () - | Task n -> computetask n - done; - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; + (* run children *) + let pids = + spawn_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let d=Unix.gettimeofday() in + (* primitives for communication *) + Unix.close pipeup_rd; + let receive,signal,return,finish,pid = + setup_children_chans oc_up pipedown ~fdarr i in + let reschunk=ref opid in + let computetask n = (* compute chunk number n *) + let lo=n*chunksize in + let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + begin + let errmsg = Printexc.to_string e in + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a \ + total of %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) errmsg i; + signal (Error (i,errmsg): msg_to_master); + finish() + end + in + reschunk:= compute al lo hi !reschunk exc_handler; + log_debug + "worker on core %d (pid=%d), segment (%d,%d) of data of \ + length %d, chunksize=%d finished in %f seconds" + i pid lo hi ln chunksize (Unix.gettimeofday() -. d) + in + while true do + (* ask for work until we are finished *) + signal (Ready i); + match receive() with + | Finished -> return (!reschunk:'d); finish () + | Task n -> computetask n + done) + in (* close unused ends of the pipes *) Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown; @@ -314,7 +337,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute (* get ic/oc/wfdl *) let ocs= - Array.init ncores + Array.init !ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in let ic=Unix.in_channel_of_descr pipeup_rd in @@ -325,7 +348,7 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute (log_debug "sending task %d to worker %d" i w; let oc = ocs.(w) in (Marshal.to_channel oc (Task i) []); flush oc) - | Error (core,msg) -> handle_exc core msg + | (Error (core,msg): msg_to_master) -> handle_exc core msg done; (* send termination token to all children *) @@ -336,94 +359,84 @@ let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute ) ocs; (* wait for all children to terminate *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done; + wait_for_pids pids; (* read in all data *) let res = ref [] in (* iterate in reverse order, to accumulate in the right order *) - for i = 0 to ncores-1 do - res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res; + for i = 0 to !ncores-1 do + res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res; done; (* collect all results *) collect !res end (* parametric iteration primitive that captures the parallel structure *) -let geniter init finalize ncores ~chunksize compute al = +let geniter init finalize ncores' ~chunksize compute al = let ln = Array.length al in if ln=0 then () else begin - let ncores = min ln (max 1 ncores) in - log_debug "geniter on %d elements, on %d cores%!" ln ncores; + set_ncores (min ln (max 1 ncores')); + log_debug "geniter on %d elements, on %d cores%!" ln !ncores; match chunksize with None -> - simpleiter init finalize ncores compute al (* no need of load balancing *) - | Some v when ncores >= ln/v -> - simpleiter init finalize ncores compute al (* no need of load balancing *) + simpleiter init finalize !ncores compute al (* no need of load balancing *) + | Some v when !ncores >= ln/v -> + simpleiter init finalize !ncores compute al (* no need of load balancing *) | Some v -> (* init task parameters *) let chunksize = v and ntasks = ln/v in (* flush everything *) flush_all (); (* setup communication channel with the workers *) - let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in + let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in let pipeup_rd,pipeup_wr=Unix.pipe () in let oc_up = Unix.out_channel_of_descr pipeup_wr in - (* call the GC before forking *) - Gc.compact (); (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let d=Unix.gettimeofday() in - (* primitives for communication *) - Unix.close pipeup_rd; - let receive,signal,return,finish,pid = - setup_children_chans oc_up pipedown i in - let computetask n = (* compute chunk number n *) - let lo=n*chunksize in - let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - begin - let errmsg = Printexc.to_string e in - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of \ - a total of %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) errmsg i; - signal (Error (i,errmsg)); - finish() - end - in - compute al lo hi exc_handler; - log_debug - "worker on core %d (pid=%d), segment (%d,%d) of data \ - of length %d, chunksize=%d finished in %f seconds" - i pid lo hi ln chunksize (Unix.gettimeofday() -. d) - in - while true do - (* ask for work until we are finished *) - signal (Ready i); - match receive() with - | Finished -> return(); finish () - | Task n -> computetask n - done; - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; + let pids = + spawn_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let d=Unix.gettimeofday() in + (* primitives for communication *) + Unix.close pipeup_rd; + let receive,signal,return,finish,pid = + setup_children_chans oc_up pipedown i in + let computetask n = (* compute chunk number n *) + let lo=n*chunksize in + let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + begin + let errmsg = Printexc.to_string e in + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of \ + a total of %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) errmsg i; + signal (Error (i,errmsg): msg_to_master); + finish() + end + in + compute al lo hi exc_handler; + log_debug + "worker on core %d (pid=%d), segment (%d,%d) of data \ + of length %d, chunksize=%d finished in %f seconds" + i pid lo hi ln chunksize (Unix.gettimeofday() -. d) + in + while true do + (* ask for work until we are finished *) + signal (Ready i); + match receive() with + | Finished -> return(); finish () + | Task n -> computetask n + done) + in (* close unused ends of the pipes *) Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown; Unix.close pipeup_wr; (* get ic/oc/wfdl *) - let ocs=Array.init ncores + let ocs=Array.init !ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in let ic=Unix.in_channel_of_descr pipeup_rd in @@ -434,7 +447,7 @@ let geniter init finalize ncores ~chunksize compute al = (log_debug "sending task %d to worker %d" i w; let oc = ocs.(w) in (Marshal.to_channel oc (Task i) []); flush oc) - | Error (core,msg) -> handle_exc core msg + | (Error (core,msg): msg_to_master) -> handle_exc core msg done; (* send termination token to all children *) @@ -445,10 +458,7 @@ let geniter init finalize ncores ~chunksize compute al = ) ocs; (* wait for all children to terminate *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done + wait_for_pids pids; (* no data to return *) end diff --git a/parmap.mldylib b/parmap.mldylib index df08796..eb65e2d 100644 --- a/parmap.mldylib +++ b/parmap.mldylib @@ -1,3 +1,4 @@ Parmap Bytearray Parmap_utils +Setcore diff --git a/parmap.mli b/parmap.mli index 2a08377..52dba04 100644 --- a/parmap.mli +++ b/parmap.mli @@ -30,6 +30,15 @@ val set_default_ncores : int -> unit val get_default_ncores : unit -> int +(** {6 Getting ncores being used during parallel execution } *) + +val get_ncores : unit -> int + +(** {6 Getting the current worker rank. The master process has rank -1. Other processes + have the rank at which they were forked out (a worker's rank is in [0..ncores-1]) } *) + +val get_rank : unit -> int + (** {6 Sequence type, subsuming lists and arrays} *) type 'a sequence = L of 'a list | A of 'a array;; diff --git a/parmap.mllib b/parmap.mllib index df08796..eb65e2d 100644 --- a/parmap.mllib +++ b/parmap.mllib @@ -1,3 +1,4 @@ Parmap Bytearray Parmap_utils +Setcore diff --git a/setcore.mli b/setcore.ml similarity index 100% rename from setcore.mli rename to setcore.ml -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ocaml-maint/packages/parmap.git _______________________________________________ Pkg-ocaml-maint-commits mailing list Pkg-ocaml-maint-commits@lists.alioth.debian.org http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/pkg-ocaml-maint-commits