Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor
On Mon, Dec 14, 2015 at 12:39 PM, Johannes Sixtwrote: > > I can't quite parse the first sentence in this paragraph. Perhaps something > like this: > > To detect when a child has finished executing, we check interleaved > with other actions (such as checking the liveliness of children or > starting new processes) whether the stderr pipe still exists. Once a > child closed its stderr stream, we assume it is terminating very soon, > and use finish_command() from the single external process execution > interface to collect the exit status. Sounds much better than my words. If a resend is necessary, I'll have your reworded version of the paragraph instead. > > >> >> By maintaining the strong assumption of stderr being open until the >> very end of a child process, we can avoid other hassle such as an >> implementation using `waitpid(-1)`, which is not implemented in Windows. >> >> Signed-off-by: Stefan Beller > > -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
[PATCH 6/8] run-command: add an asynchronous parallel child processor
This allows to run external commands in parallel with ordered output on stderr. If we run external commands in parallel we cannot pipe the output directly to the our stdout/err as it would mix up. So each process's output will flow through a pipe, which we buffer. One subprocess can be directly piped to out stdout/err for a low latency feedback to the user. Example: Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a different amount of time as the different submodules vary in size, then the output of fetches in sequential order might look like this: time --> output: |---A---| |-B-| |---C---| |-D-| |-E-| When we schedule these submodules into maximal two parallel processes, a schedule and sample output over time may look like this: process 1: |---A---| |-D-| |-E-| process 2: |-B-| |---C---| output:|---A---|B|---C---|DE So A will be perceived as it would run normally in the single child version. As B has finished by the time A is done, we can dump its whole progress buffer on stderr, such that it looks like it finished in no time. Once that is done, C is determined to be the visible child and its progress will be reported in real time. So this way of output is really good for human consumption, as it only changes the timing, not the actual output. For machine consumption the output needs to be prepared in the tasks, by either having a prefix per line or per block to indicate whose tasks output is displayed, because the output order may not follow the original sequential ordering: |A| |--B--| |-C-| will be scheduled to be all parallel: process 1: |A| process 2: |--B--| process 3: |-C-| output:|A|CB This happens because C finished before B did, so it will be queued for output before B. The detection when a child has finished executing is done the same way as two fold. First we check regularly if the stderr pipe still exists in an interleaved manner with other actions such as checking other children for their liveliness or starting new children. Once a child closed their stderr stream, we assume it is stopping very soon, such that we can use the `finish_command` code borrowed from the single external process execution interface. By maintaining the strong assumption of stderr being open until the very end of a child process, we can avoid other hassle such as an implementation using `waitpid(-1)`, which is not implemented in Windows. Signed-off-by: Stefan Beller--- run-command.c | 335 + run-command.h | 80 t/t0061-run-command.sh | 53 test-run-command.c | 55 +++- 4 files changed, 522 insertions(+), 1 deletion(-) diff --git a/run-command.c b/run-command.c index 13fa452..51fd72c 100644 --- a/run-command.c +++ b/run-command.c @@ -3,6 +3,8 @@ #include "exec_cmd.h" #include "sigchain.h" #include "argv-array.h" +#include "thread-utils.h" +#include "strbuf.h" void child_process_init(struct child_process *child) { @@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +enum child_state { + GIT_CP_FREE, + GIT_CP_WORKING, + GIT_CP_WAIT_CLEANUP, +}; + +struct parallel_processes { + void *data; + + int max_processes; + int nr_processes; + + get_next_task_fn get_next_task; + start_failure_fn start_failure; + task_finished_fn task_finished; + + struct { + enum child_state state; + struct child_process process; + struct strbuf err; + void *data; + } *children; + /* +* The struct pollfd is logically part of *children, +* but the system call expects it as its own array. +*/ + struct pollfd *pfd; + + unsigned shutdown : 1; + + int output_owner; + struct strbuf buffered_output; /* of finished children */ +}; + +static int default_start_failure(struct child_process *cp, +struct strbuf *err, +void *pp_cb, +void *pp_task_cb) +{ + int i; + + strbuf_addstr(err, "Starting a child failed:"); + for (i = 0; cp->argv[i]; i++) + strbuf_addf(err, " %s", cp->argv[i]); + + return 0; +} + +static int default_task_finished(int result, +struct child_process *cp, +struct strbuf *err, +void *pp_cb, +void *pp_task_cb) +{ + int i; + + if (!result) + return 0; + + strbuf_addf(err, "A child failed with return code %d:", result); + for (i = 0; cp->argv[i]; i++) + strbuf_addf(err, " %s", cp->argv[i]); + + return 0; +} + +static void
Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor
Am 14.12.2015 um 20:37 schrieb Stefan Beller: This allows to run external commands in parallel with ordered output on stderr. If we run external commands in parallel we cannot pipe the output directly to the our stdout/err as it would mix up. So each process's output will flow through a pipe, which we buffer. One subprocess can be directly piped to out stdout/err for a low latency feedback to the user. Example: Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a different amount of time as the different submodules vary in size, then the output of fetches in sequential order might look like this: time --> output: |---A---| |-B-| |---C---| |-D-| |-E-| When we schedule these submodules into maximal two parallel processes, a schedule and sample output over time may look like this: process 1: |---A---| |-D-| |-E-| process 2: |-B-| |---C---| output:|---A---|B|---C---|DE So A will be perceived as it would run normally in the single child version. As B has finished by the time A is done, we can dump its whole progress buffer on stderr, such that it looks like it finished in no time. Once that is done, C is determined to be the visible child and its progress will be reported in real time. So this way of output is really good for human consumption, as it only changes the timing, not the actual output. For machine consumption the output needs to be prepared in the tasks, by either having a prefix per line or per block to indicate whose tasks output is displayed, because the output order may not follow the original sequential ordering: |A| |--B--| |-C-| will be scheduled to be all parallel: process 1: |A| process 2: |--B--| process 3: |-C-| output:|A|CB This happens because C finished before B did, so it will be queued for output before B. The detection when a child has finished executing is done the same way as two fold. First we check regularly if the stderr pipe still exists in an interleaved manner with other actions such as checking other children for their liveliness or starting new children. Once a child closed their stderr stream, we assume it is stopping very soon, such that we can use the `finish_command` code borrowed from the single external process execution interface. I can't quite parse the first sentence in this paragraph. Perhaps something like this: To detect when a child has finished executing, we check interleaved with other actions (such as checking the liveliness of children or starting new processes) whether the stderr pipe still exists. Once a child closed its stderr stream, we assume it is terminating very soon, and use finish_command() from the single external process execution interface to collect the exit status. By maintaining the strong assumption of stderr being open until the very end of a child process, we can avoid other hassle such as an implementation using `waitpid(-1)`, which is not implemented in Windows. Signed-off-by: Stefan Beller-- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor
Junio C Hamanowrites: > I may have comments on other parts of this patch, but I noticed this > a bit hard to read while reading the end result. > ... I finished reading the remainder. Other than the above all look sensible. Will replace what had been queued. Thanks. -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor
On Tue, Sep 29, 2015 at 8:12 PM, Junio C Hamanowrote: > Stefan Beller writes: > >> + while (1) { >> + int i; >> + int output_timeout = 100; >> + int spawn_cap = 4; >> + >> + if (!no_more_task) { >> + for (i = 0; i < spawn_cap; i++) { >> + int code; >> + if (pp->nr_processes == pp->max_processes) >> + break; >> + >> + code = pp_start_one(pp); >> + if (!code) >> + continue; >> + if (code < 0) { >> + pp->shutdown = 1; >> + kill_children(pp, SIGTERM); >> + } >> + no_more_task = 1; >> + break; >> + } >> + } >> + if (no_more_task && !pp->nr_processes) >> + break; > > I may have comments on other parts of this patch, but I noticed this > a bit hard to read while reading the end result. > > Losing the outer "if (!no_more_task)" and replacing the above with > > for (no_more_task = 0, i = 0; > !no_more_task && i < spawn_cap; > i++) { > ... do things that may or may not set > ... no_more_task > } > if (no_more_task && ...) > break; > > would make it clear that regardless of spawn_cap, no_more_task is > honored. > > Also I think that having the outer "if (!no_more_task)" and not > having "no_more_task = 0" after each iteration is buggy. Even when > next_task() told start_one() that it does not have more tasks for > now, as long as there are running processes, it is entirely plausible > that next call to next_task() can return "now we have some more task > to do". > > Although I think it would make it unsightly, if you want to have the > large indentation that protects the spawn_cap loop from getting > entered, the condition would be > > if (!pp->shutdown) { > for (... spawn_cap loop ...) { > ... > } > } > > That structure could make sense. But even then I would probably > write it more like > > ... > int spawn_cap = 4; > > pp = pp_init(...); > while (1) { > int no_more_task = 0; > > for (i = 0; > !no_more_task && !pp->shutdown && i < spawn_cap; > i++) { > ... > code = start_one(); > ... set no_more_task to 1 as needed > ... set pp->shutdown to 1 as needed > } > if (no_more_task && !pp->nr_processes) > break; > buffer_stderr(...); > output(...); > collect(...); > } That is indeed better to read. Though if we reset `no_more_task` each iteration of the loop by having its declaration inside the loop, the condition for exiting the loop needs to be: if ((no_more_task || pp->shutdown) && !pp->nr_processes) break; for correctness. When looking at that condition, I realized that I implicitly assumed the workflow, where get_next_task returns 0 intermittently, to be a second class citizen. I reworded the documentation as well there. > > That is, you need to have two independent conditions that tell you > not to spawn any new task: > > (1) You called start_one() repeatedly and next_task() said "nothing > more for now", so you know calling start_one() one more time > without changing other conditions (like draining output from > running processes and culling finished ones) will not help. > > Letting other parts of the application that uses this scheduler > loop (i.e. drain output, cull finished process, etc.) may > change the situation and you _do_ need to call start_one() when > the next_task() merely said "nothing more for now". > > That is what no_more_task controls. > > (2) The application said "I want the system to be gracefully shut > down". next_task() may also have said "nothing more for now" > and you may have set no_more_task in response to it, but unlike > (1) above, draining and culling must be done only to shut the > system down, the application does not want new processes to be > added. You do not want to enter the spawn_cap loop when it > happens. > > That is what pp->shutdown controls. > -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor
Stefan Bellerwrites: > + while (1) { > + int i; > + int output_timeout = 100; > + int spawn_cap = 4; > + > + if (!no_more_task) { > + for (i = 0; i < spawn_cap; i++) { > + int code; > + if (pp->nr_processes == pp->max_processes) > + break; > + > + code = pp_start_one(pp); > + if (!code) > + continue; > + if (code < 0) { > + pp->shutdown = 1; > + kill_children(pp, SIGTERM); > + } > + no_more_task = 1; > + break; > + } > + } > + if (no_more_task && !pp->nr_processes) > + break; I may have comments on other parts of this patch, but I noticed this a bit hard to read while reading the end result. Losing the outer "if (!no_more_task)" and replacing the above with for (no_more_task = 0, i = 0; !no_more_task && i < spawn_cap; i++) { ... do things that may or may not set ... no_more_task } if (no_more_task && ...) break; would make it clear that regardless of spawn_cap, no_more_task is honored. Also I think that having the outer "if (!no_more_task)" and not having "no_more_task = 0" after each iteration is buggy. Even when next_task() told start_one() that it does not have more tasks for now, as long as there are running processes, it is entirely plausible that next call to next_task() can return "now we have some more task to do". Although I think it would make it unsightly, if you want to have the large indentation that protects the spawn_cap loop from getting entered, the condition would be if (!pp->shutdown) { for (... spawn_cap loop ...) { ... } } That structure could make sense. But even then I would probably write it more like ... int spawn_cap = 4; pp = pp_init(...); while (1) { int no_more_task = 0; for (i = 0; !no_more_task && !pp->shutdown && i < spawn_cap; i++) { ... code = start_one(); ... set no_more_task to 1 as needed ... set pp->shutdown to 1 as needed } if (no_more_task && !pp->nr_processes) break; buffer_stderr(...); output(...); collect(...); } That is, you need to have two independent conditions that tell you not to spawn any new task: (1) You called start_one() repeatedly and next_task() said "nothing more for now", so you know calling start_one() one more time without changing other conditions (like draining output from running processes and culling finished ones) will not help. Letting other parts of the application that uses this scheduler loop (i.e. drain output, cull finished process, etc.) may change the situation and you _do_ need to call start_one() when the next_task() merely said "nothing more for now". That is what no_more_task controls. (2) The application said "I want the system to be gracefully shut down". next_task() may also have said "nothing more for now" and you may have set no_more_task in response to it, but unlike (1) above, draining and culling must be done only to shut the system down, the application does not want new processes to be added. You do not want to enter the spawn_cap loop when it happens. That is what pp->shutdown controls. -- To unsubscribe from this list: send the line "unsubscribe git" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
[PATCH 6/8] run-command: add an asynchronous parallel child processor
This allows to run external commands in parallel with ordered output on stderr. If we run external commands in parallel we cannot pipe the output directly to the our stdout/err as it would mix up. So each process's output will flow through a pipe, which we buffer. One subprocess can be directly piped to out stdout/err for a low latency feedback to the user. Example: Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a different amount of time as the different submodules vary in size, then the output of fetches in sequential order might look like this: time --> output: |---A---| |-B-| |---C---| |-D-| |-E-| When we schedule these submodules into maximal two parallel processes, a schedule and sample output over time may look like this: process 1: |---A---| |-D-| |-E-| process 2: |-B-| |---C---| output:|---A---|B|---C---|DE So A will be perceived as it would run normally in the single child version. As B has finished by the time A is done, we can dump its whole progress buffer on stderr, such that it looks like it finished in no time. Once that is done, C is determined to be the visible child and its progress will be reported in real time. So this way of output is really good for human consumption, as it only changes the timing, not the actual output. For machine consumption the output needs to be prepared in the tasks, by either having a prefix per line or per block to indicate whose tasks output is displayed, because the output order may not follow the original sequential ordering: |A| |--B--| |-C-| will be scheduled to be all parallel: process 1: |A| process 2: |--B--| process 3: |-C-| output:|A|CB This happens because C finished before B did, so it will be queued for output before B. Signed-off-by: Stefan BellerSigned-off-by: Junio C Hamano --- run-command.c | 348 + run-command.h | 63 + t/t0061-run-command.sh | 20 +++ test-run-command.c | 24 4 files changed, 455 insertions(+) diff --git a/run-command.c b/run-command.c index 28e1d55..df84985 100644 --- a/run-command.c +++ b/run-command.c @@ -3,6 +3,8 @@ #include "exec_cmd.h" #include "sigchain.h" #include "argv-array.h" +#include "thread-utils.h" +#include "strbuf.h" void child_process_init(struct child_process *child) { @@ -852,3 +854,349 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +struct parallel_processes { + void *data; + + int max_processes; + int nr_processes; + + get_next_task_fn get_next_task; + start_failure_fn start_failure; + return_value_fn return_value; + + struct { + unsigned in_use : 1; + struct child_process process; + struct strbuf err; + } *children; + /* +* The struct pollfd is logically part of *children, +* but the system call expects it as its own array. +*/ + struct pollfd *pfd; + + unsigned shutdown : 1; + + int output_owner; + struct strbuf buffered_output; /* of finished children */ +} parallel_processes_struct; + +static int default_start_failure(void *data, +struct child_process *cp, +struct strbuf *err) +{ + int i; + + strbuf_addstr(err, "Starting a child failed:"); + for (i = 0; cp->argv[i]; i++) + strbuf_addf(err, " %s", cp->argv[i]); + + return 0; +} + +static int default_return_value(void *data, + struct child_process *cp, + struct strbuf *err, + int result) +{ + int i; + + if (!result) + return 0; + + strbuf_addf(err, "A child failed with return code %d:", result); + for (i = 0; cp->argv[i]; i++) + strbuf_addf(err, " %s", cp->argv[i]); + + return 0; +} + +static void kill_children(struct parallel_processes *pp, int signo) +{ + int i, n = pp->max_processes; + + for (i = 0; i < n; i++) + if (pp->children[i].in_use) + kill(pp->children[i].process.pid, signo); +} + +static void handle_children_on_signal(int signo) +{ + struct parallel_processes *pp = _processes_struct; + + kill_children(pp, signo); + sigchain_pop(signo); + raise(signo); +} + +static struct parallel_processes *pp_init(int n, void *data, + get_next_task_fn get_next_task, + start_failure_fn start_failure, + return_value_fn return_value) +{ + int i; + struct parallel_processes *pp = _processes_struct; + + if (n < 1) + n =