Hello community, here is the log from the commit of package ocaml-parmap for openSUSE:Factory checked in at 2017-08-18 15:03:27 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/ocaml-parmap (Old) and /work/SRC/openSUSE:Factory/.ocaml-parmap.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "ocaml-parmap" Fri Aug 18 15:03:27 2017 rev:3 rq:517015 version:20170223.c9b0ee7 Changes: -------- --- /work/SRC/openSUSE:Factory/ocaml-parmap/ocaml-parmap.changes 2017-06-09 15:56:47.700855148 +0200 +++ /work/SRC/openSUSE:Factory/.ocaml-parmap.new/ocaml-parmap.changes 2017-08-18 15:03:27.994074563 +0200 @@ -1,0 +2,26 @@ +Tue Jul 25 13:04:54 UTC 2017 - [email protected] + +- Use Group: Development/Languages/OCaml + +------------------------------------------------------------------- +Mon Jul 17 20:24:18 UTC 2017 - [email protected] + +- Update to version 20170223.c9b0ee7 + Only wait for the children started by parmap + Prevent at_exit handlers registered in the parent from running in the child + handle EINTR errors in wait_for_pids + added get_ncores + Gc.compact needs to be called only inside of spawn_many + added get_rank so that worker processes can know at which rank they were forked out of the master process + +------------------------------------------------------------------- +Mon Jul 17 12:48:58 UTC 2017 - [email protected] + +- Wrap specfile conditionals to fix quilt setup + +------------------------------------------------------------------- +Mon Jul 17 10:48:58 UTC 2017 - [email protected] + +- Remove autodeps for pre openSUSE 12.1 releases + +------------------------------------------------------------------- Old: ---- ocaml-parmap-20160605.10e2437.tar.xz New: ---- ocaml-parmap-20170223.c9b0ee7.tar.xz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ ocaml-parmap.spec ++++++ --- /var/tmp/diff_new_pack.9nkM6T/_old 2017-08-18 15:03:28.973936628 +0200 +++ /var/tmp/diff_new_pack.9nkM6T/_new 2017-08-18 15:03:28.977936065 +0200 @@ -17,12 +17,12 @@ Name: ocaml-parmap -Version: 20160605.10e2437 +Version: 20170223.c9b0ee7 Release: 0 -%{ocaml_preserve_bytecode} +%{?ocaml_preserve_bytecode} Summary: Exploit multicore architectures for OCaml programs with minimal modifications License: LGPL-2.0 -Group: Development/Libraries/Other +Group: Development/Languages/OCaml Url: http://rdicosmo.github.io/parmap/ Source: %{name}-%{version}.tar.xz BuildRequires: ocaml @@ -34,12 +34,6 @@ BuildRequires: ocamlfind(unix) BuildRequires: pkgconfig(x11) BuildRoot: %{_tmppath}/%{name}-%{version}-build -# ocaml autodep start for pkg: ocaml-parmap -# hardcoded rpm dependency for pre 12.1 to compensate for lack of ocaml() provides/requires -%if 0%{?suse_version} < 1210 -Requires: ocaml-runtime -%endif -# ocaml autodep end for pkg: ocaml-parmap %description If you want to use your many cores to accelerate an operation @@ -52,7 +46,7 @@ %package devel Summary: Development files for %{name} -Group: Development/Libraries/Other +Group: Development/Languages/OCaml Requires: %{name} = %{version} %description devel @@ -89,7 +83,7 @@ Install: true Modules: Parmap CSources: bytearray_stubs.c, setcore_stubs.c, config.h - CCOpt: -Wall -O2 -g -I$PWD -Werror -D_GNU_SOURCE + CCOpt: %{optflags} -I$PWD -Werror -D_GNU_SOURCE Document parmap Title: API reference for parmap @@ -132,7 +126,7 @@ %{_bindir}/* %dir %{_libdir}/ocaml %dir %{_libdir}/ocaml/* -%if %{ocaml_native_compiler} +%if 0%{?ocaml_native_compiler} %{_libdir}/ocaml/*/*.cmxs %endif %{_libdir}/ocaml/*/*.so @@ -143,7 +137,7 @@ %{oasis_docdir_html} %dir %{_libdir}/ocaml/* %{_libdir}/ocaml/*/*.a -%if %{ocaml_native_compiler} +%if 0%{?ocaml_native_compiler} %{_libdir}/ocaml/*/*.cmx %{_libdir}/ocaml/*/*.cmxa %endif ++++++ ocaml-parmap-20160605.10e2437.tar.xz -> ocaml-parmap-20170223.c9b0ee7.tar.xz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ocaml-parmap-20160605.10e2437/parmap.ml new/ocaml-parmap-20170223.c9b0ee7/parmap.ml --- old/ocaml-parmap-20160605.10e2437/parmap.ml 2016-06-24 00:36:19.000000000 +0200 +++ new/ocaml-parmap-20170223.c9b0ee7/parmap.ml 2017-02-24 01:29:58.000000000 +0100 @@ -30,6 +30,19 @@ let set_default_ncores n = default_ncores := n;; let get_default_ncores () = !default_ncores;; +let ncores = ref 0;; + +let set_ncores n = ncores := n;; +let get_ncores () = !ncores + +(* worker process rank *) + +let masters_rank = -1 +let rank = ref masters_rank + +let set_rank n = rank := n +let get_rank () = !rank + (* exception handling code *) let handle_exc core msg = @@ -117,100 +130,113 @@ let s = Marshal.to_string v [Marshal.Closures] in ignore(Bytearray.mmap_of_string fd s) +(* Exit the program with calling [at_exit] handlers *) +external sys_exit : int -> 'a = "caml_sys_exit" + +let spawn_many n ~in_subprocess = + let rec loop i acc = + if i = n then + acc + else + match Unix.fork() with + 0 -> + (* [at_exit] handlers are called in reverse order of registration. + By registering a handler that exits prematurely, we prevent the + execution of handlers registered before the fork. + + This ignores the exit code provided by the user, but we ignore + it anyway in [wait_for_pids]. + *) + at_exit (fun () -> sys_exit 0); + set_rank i; + in_subprocess i; + exit 0 + | -1 -> + Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; + loop (i + 1) acc + | pid -> + loop (i + 1) (pid :: acc) + in + (* call the GC before forking *) + Gc.compact (); + loop 0 [] + +let wait_for_pids pids = + let rec wait_for_pid pid = + try ignore(Unix.waitpid [] pid : int * Unix.process_status) + with + | Unix.Unix_error (Unix.ECHILD, _, _) -> () + | Unix.Unix_error (Unix.EINTR, _, _) -> wait_for_pid pid + in + List.iter wait_for_pid pids + +let run_many n ~in_subprocess = + wait_for_pids (spawn_many n ~in_subprocess) + (* a simple mapper function that computes 1/nth of the data on each of the n cores in one iteration *) -let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores compute opid al collect = +let simplemapper (init:int -> unit) (finalize: unit -> unit) ncores' compute opid al collect = (* flush everything *) flush_all(); (* init task parameters *) let ln = Array.length al in - let ncores = min ln (max 1 ncores) in - let chunksize = max 1 (ln/ncores) in + set_ncores (min ln (max 1 ncores')); + let chunksize = max 1 (ln / !ncores) in log_debug "simplemapper on %d elements, on %d cores, chunksize = %d%!" - ln ncores chunksize; + ln !ncores chunksize; (* create descriptors to mmap *) - let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let lo=i*chunksize in - let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a total of \ - %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; - exit 1 - in - let v = compute al lo hi opid exc_handler in - marshal fdarr.(i) v; - exit 0 - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; - (* wait for all children *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done; + let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in + (* run children *) + run_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let lo=i*chunksize in + let hi=if i = !ncores - 1 then ln - 1 else (i + 1) * chunksize - 1 in + let exc_handler e j = (* handle an exception at index j *) + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a total of \ + %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; + exit 1 + in + let v = compute al lo hi opid exc_handler in + marshal fdarr.(i) v); (* read in all data *) let res = ref [] in (* iterate in reverse order, to accumulate in the right order *) - for i = 0 to ncores-1 do - res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res; + for i = 0 to !ncores - 1 do + res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res; done; (* collect all results *) collect !res (* a simple iteration function that iterates on 1/nth of the data on each of the n cores *) -let simpleiter init finalize ncores compute al = +let simpleiter init finalize ncores' compute al = (* flush everything *) flush_all(); (* init task parameters *) let ln = Array.length al in - let ncores = min ln (max 1 ncores) in - let chunksize = max 1 (ln/ncores) in + set_ncores (min ln (max 1 ncores')); + let chunksize = max 1 (ln / !ncores) in log_debug "simplemapper on %d elements, on %d cores, chunksize = %d%!" - ln ncores chunksize; - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let lo=i*chunksize in - let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a total of \ - %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; - exit 1 - in - compute al lo hi exc_handler; - exit 0 - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; - (* wait for all children *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done + ln !ncores chunksize; + (* run children *) + run_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let lo=i*chunksize in + let hi=if i= !ncores - 1 then ln-1 else (i+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a total of \ + %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i; + exit 1 + in + compute al lo hi exc_handler); (* return with no value *) (* a more sophisticated mapper function, with automatic load balancing *) @@ -234,24 +260,24 @@ let finish () = (log_debug "shutting down (pid=%d)\n%!" pid; try close_in ic; close_out oc with _ -> () - ); + ); exit 0 in receive, signal, return, finish, pid (* parametric mapper primitive that captures the parallel structure *) -let mapper (init:int -> unit) (finalize:unit -> unit) ncores ~chunksize compute opid al collect = +let mapper (init:int -> unit) (finalize:unit -> unit) ncores' ~chunksize compute opid al collect = let ln = Array.length al in if ln=0 then (collect []) else begin - let ncores = min ln (max 1 ncores) in - log_debug "mapper on %d elements, on %d cores%!" ln ncores; + set_ncores (min ln (max 1 ncores')); + log_debug "mapper on %d elements, on %d cores%!" ln !ncores; match chunksize with None -> (* no need of load balancing *) - simplemapper init finalize ncores compute opid al collect - | Some v when ncores >= ln/v -> + simplemapper init finalize !ncores compute opid al collect + | Some v when !ncores >= ln/v -> (* no need of load balancing if more cores than tasks *) - simplemapper init finalize ncores compute opid al collect + simplemapper init finalize !ncores compute opid al collect | Some v -> (* init task parameters : ntasks > 0 here, as otherwise ncores >= 1 >= ln/v = ntasks and we would take @@ -260,57 +286,50 @@ (* flush everything *) flush_all (); (* create descriptors to mmap *) - let fdarr=Array.init ncores (fun _ -> Utils.tempfd()) in + let fdarr=Array.init !ncores (fun _ -> Utils.tempfd()) in (* setup communication channel with the workers *) - let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in + let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in let pipeup_rd,pipeup_wr=Unix.pipe () in let oc_up = Unix.out_channel_of_descr pipeup_wr in - (* call the GC before forking *) - Gc.compact (); - (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let d=Unix.gettimeofday() in - (* primitives for communication *) - Unix.close pipeup_rd; - let receive,signal,return,finish,pid = - setup_children_chans oc_up pipedown ~fdarr i in - let reschunk=ref opid in - let computetask n = (* compute chunk number n *) - let lo=n*chunksize in - let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - begin - let errmsg = Printexc.to_string e in - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of a \ - total of %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) errmsg i; - signal (Error (i,errmsg): msg_to_master); - finish() - end - in - reschunk:= compute al lo hi !reschunk exc_handler; - log_debug - "worker on core %d (pid=%d), segment (%d,%d) of data of \ - length %d, chunksize=%d finished in %f seconds" - i pid lo hi ln chunksize (Unix.gettimeofday() -. d) - in - while true do - (* ask for work until we are finished *) - signal (Ready i); - match receive() with - | Finished -> return (!reschunk:'d); finish () - | Task n -> computetask n - done; - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; + (* run children *) + let pids = + spawn_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let d=Unix.gettimeofday() in + (* primitives for communication *) + Unix.close pipeup_rd; + let receive,signal,return,finish,pid = + setup_children_chans oc_up pipedown ~fdarr i in + let reschunk=ref opid in + let computetask n = (* compute chunk number n *) + let lo=n*chunksize in + let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + begin + let errmsg = Printexc.to_string e in + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of a \ + total of %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) errmsg i; + signal (Error (i,errmsg): msg_to_master); + finish() + end + in + reschunk:= compute al lo hi !reschunk exc_handler; + log_debug + "worker on core %d (pid=%d), segment (%d,%d) of data of \ + length %d, chunksize=%d finished in %f seconds" + i pid lo hi ln chunksize (Unix.gettimeofday() -. d) + in + while true do + (* ask for work until we are finished *) + signal (Ready i); + match receive() with + | Finished -> return (!reschunk:'d); finish () + | Task n -> computetask n + done) + in (* close unused ends of the pipes *) Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown; @@ -318,7 +337,7 @@ (* get ic/oc/wfdl *) let ocs= - Array.init ncores + Array.init !ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in let ic=Unix.in_channel_of_descr pipeup_rd in @@ -340,94 +359,84 @@ ) ocs; (* wait for all children to terminate *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done; + wait_for_pids pids; (* read in all data *) let res = ref [] in (* iterate in reverse order, to accumulate in the right order *) - for i = 0 to ncores-1 do - res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res; + for i = 0 to !ncores-1 do + res:= ((unmarshal fdarr.((!ncores-1)-i)):'d)::!res; done; (* collect all results *) collect !res end (* parametric iteration primitive that captures the parallel structure *) -let geniter init finalize ncores ~chunksize compute al = +let geniter init finalize ncores' ~chunksize compute al = let ln = Array.length al in if ln=0 then () else begin - let ncores = min ln (max 1 ncores) in - log_debug "geniter on %d elements, on %d cores%!" ln ncores; + set_ncores (min ln (max 1 ncores')); + log_debug "geniter on %d elements, on %d cores%!" ln !ncores; match chunksize with None -> - simpleiter init finalize ncores compute al (* no need of load balancing *) - | Some v when ncores >= ln/v -> - simpleiter init finalize ncores compute al (* no need of load balancing *) + simpleiter init finalize !ncores compute al (* no need of load balancing *) + | Some v when !ncores >= ln/v -> + simpleiter init finalize !ncores compute al (* no need of load balancing *) | Some v -> (* init task parameters *) let chunksize = v and ntasks = ln/v in (* flush everything *) flush_all (); (* setup communication channel with the workers *) - let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in + let pipedown=Array.init !ncores (fun _ -> Unix.pipe ()) in let pipeup_rd,pipeup_wr=Unix.pipe () in let oc_up = Unix.out_channel_of_descr pipeup_wr in - (* call the GC before forking *) - Gc.compact (); (* spawn children *) - for i = 0 to ncores-1 do - match Unix.fork() with - 0 -> - begin - init i; (* call initialization function *) - Pervasives.at_exit finalize; (* register finalization function *) - let d=Unix.gettimeofday() in - (* primitives for communication *) - Unix.close pipeup_rd; - let receive,signal,return,finish,pid = - setup_children_chans oc_up pipedown i in - let computetask n = (* compute chunk number n *) - let lo=n*chunksize in - let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in - let exc_handler e j = (* handle an exception at index j *) - begin - let errmsg = Printexc.to_string e in - Utils.log_error - "error at index j=%d in (%d,%d), chunksize=%d of \ - a total of %d got exception %s on core %d \n%!" - j lo hi chunksize (hi-lo+1) errmsg i; - signal (Error (i,errmsg): msg_to_master); - finish() - end - in - compute al lo hi exc_handler; - log_debug - "worker on core %d (pid=%d), segment (%d,%d) of data \ - of length %d, chunksize=%d finished in %f seconds" - i pid lo hi ln chunksize (Unix.gettimeofday() -. d) - in - while true do - (* ask for work until we are finished *) - signal (Ready i); - match receive() with - | Finished -> return(); finish () - | Task n -> computetask n - done; - end - | -1 -> Utils.log_error "fork error: pid %d; i=%d" (Unix.getpid()) i; - | _pid -> () - done; + let pids = + spawn_many !ncores ~in_subprocess:(fun i -> + init i; (* call initialization function *) + Pervasives.at_exit finalize; (* register finalization function *) + let d=Unix.gettimeofday() in + (* primitives for communication *) + Unix.close pipeup_rd; + let receive,signal,return,finish,pid = + setup_children_chans oc_up pipedown i in + let computetask n = (* compute chunk number n *) + let lo=n*chunksize in + let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in + let exc_handler e j = (* handle an exception at index j *) + begin + let errmsg = Printexc.to_string e in + Utils.log_error + "error at index j=%d in (%d,%d), chunksize=%d of \ + a total of %d got exception %s on core %d \n%!" + j lo hi chunksize (hi-lo+1) errmsg i; + signal (Error (i,errmsg): msg_to_master); + finish() + end + in + compute al lo hi exc_handler; + log_debug + "worker on core %d (pid=%d), segment (%d,%d) of data \ + of length %d, chunksize=%d finished in %f seconds" + i pid lo hi ln chunksize (Unix.gettimeofday() -. d) + in + while true do + (* ask for work until we are finished *) + signal (Ready i); + match receive() with + | Finished -> return(); finish () + | Task n -> computetask n + done) + in (* close unused ends of the pipes *) Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown; Unix.close pipeup_wr; (* get ic/oc/wfdl *) - let ocs=Array.init ncores + let ocs=Array.init !ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in let ic=Unix.in_channel_of_descr pipeup_rd in @@ -449,10 +458,7 @@ ) ocs; (* wait for all children to terminate *) - for _i = 0 to ncores-1 do - try ignore(Unix.wait()) - with Unix.Unix_error (Unix.ECHILD, _, _) -> () - done + wait_for_pids pids; (* no data to return *) end diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/ocaml-parmap-20160605.10e2437/parmap.mli new/ocaml-parmap-20170223.c9b0ee7/parmap.mli --- old/ocaml-parmap-20160605.10e2437/parmap.mli 2016-06-24 00:36:19.000000000 +0200 +++ new/ocaml-parmap-20170223.c9b0ee7/parmap.mli 2017-02-24 01:29:58.000000000 +0100 @@ -30,6 +30,15 @@ val get_default_ncores : unit -> int +(** {6 Getting ncores being used during parallel execution } *) + +val get_ncores : unit -> int + +(** {6 Getting the current worker rank. The master process has rank -1. Other processes + have the rank at which they were forked out (a worker's rank is in [0..ncores-1]) } *) + +val get_rank : unit -> int + (** {6 Sequence type, subsuming lists and arrays} *) type 'a sequence = L of 'a list | A of 'a array;;
