Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-12-14 Thread Stefan Beller
On Mon, Dec 14, 2015 at 12:39 PM, Johannes Sixt  wrote:
>
> I can't quite parse the first sentence in this paragraph. Perhaps something
> like this:
>
> To detect when a child has finished executing, we check interleaved
> with other actions (such as checking the liveliness of children or
> starting new processes) whether the stderr pipe still exists. Once a
> child closed its stderr stream, we assume it is terminating very soon,
> and use finish_command() from the single external process execution
> interface to collect the exit status.

Sounds much better than my words. If a resend is necessary, I'll have your
reworded version of the paragraph instead.

>
>
>>
>> By maintaining the strong assumption of stderr being open until the
>> very end of a child process, we can avoid other hassle such as an
>> implementation using `waitpid(-1)`, which is not implemented in Windows.
>>
>> Signed-off-by: Stefan Beller 
>
>
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-12-14 Thread Stefan Beller
This allows to run external commands in parallel with ordered output
on stderr.

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

 time -->
 output: |---A---| |-B-| |---C---| |-D-| |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

process 1: |---A---| |-D-| |-E-|

process 2: |-B-| |---C---|

output:|---A---|B|---C---|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no
time. Once that is done, C is determined to be the visible child and
its progress will be reported in real time.

So this way of output is really good for human consumption, as it only
changes the timing, not the actual output.

For machine consumption the output needs to be prepared in the tasks,
by either having a prefix per line or per block to indicate whose tasks
output is displayed, because the output order may not follow the
original sequential ordering:

 |A| |--B--| |-C-|

will be scheduled to be all parallel:

process 1: |A|
process 2: |--B--|
process 3: |-C-|
output:|A|CB

This happens because C finished before B did, so it will be queued for
output before B.

The detection when a child has finished executing is done the same way as
two fold. First we check regularly if the stderr pipe still exists in an
interleaved manner with other actions such as checking other children
for their liveliness or starting new children. Once a child closed their
stderr stream, we assume it is stopping very soon, such that we can use
the `finish_command` code borrowed from the single external process
execution interface.

By maintaining the strong assumption of stderr being open until the
very end of a child process, we can avoid other hassle such as an
implementation using `waitpid(-1)`, which is not implemented in Windows.

Signed-off-by: Stefan Beller 
---
 run-command.c  | 335 +
 run-command.h  |  80 
 t/t0061-run-command.sh |  53 
 test-run-command.c |  55 +++-
 4 files changed, 522 insertions(+), 1 deletion(-)

diff --git a/run-command.c b/run-command.c
index 13fa452..51fd72c 100644
--- a/run-command.c
+++ b/run-command.c
@@ -3,6 +3,8 @@
 #include "exec_cmd.h"
 #include "sigchain.h"
 #include "argv-array.h"
+#include "thread-utils.h"
+#include "strbuf.h"
 
 void child_process_init(struct child_process *child)
 {
@@ -865,3 +867,336 @@ int capture_command(struct child_process *cmd, struct 
strbuf *buf, size_t hint)
close(cmd->out);
return finish_command(cmd);
 }
+
+enum child_state {
+   GIT_CP_FREE,
+   GIT_CP_WORKING,
+   GIT_CP_WAIT_CLEANUP,
+};
+
+struct parallel_processes {
+   void *data;
+
+   int max_processes;
+   int nr_processes;
+
+   get_next_task_fn get_next_task;
+   start_failure_fn start_failure;
+   task_finished_fn task_finished;
+
+   struct {
+   enum child_state state;
+   struct child_process process;
+   struct strbuf err;
+   void *data;
+   } *children;
+   /*
+* The struct pollfd is logically part of *children,
+* but the system call expects it as its own array.
+*/
+   struct pollfd *pfd;
+
+   unsigned shutdown : 1;
+
+   int output_owner;
+   struct strbuf buffered_output; /* of finished children */
+};
+
+static int default_start_failure(struct child_process *cp,
+struct strbuf *err,
+void *pp_cb,
+void *pp_task_cb)
+{
+   int i;
+
+   strbuf_addstr(err, "Starting a child failed:");
+   for (i = 0; cp->argv[i]; i++)
+   strbuf_addf(err, " %s", cp->argv[i]);
+
+   return 0;
+}
+
+static int default_task_finished(int result,
+struct child_process *cp,
+struct strbuf *err,
+void *pp_cb,
+void *pp_task_cb)
+{
+   int i;
+
+   if (!result)
+   return 0;
+
+   strbuf_addf(err, "A child failed with return code %d:", result);
+   for (i = 0; cp->argv[i]; i++)
+   strbuf_addf(err, " %s", cp->argv[i]);
+
+   return 0;
+}
+
+static void 

Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-12-14 Thread Johannes Sixt

Am 14.12.2015 um 20:37 schrieb Stefan Beller:

This allows to run external commands in parallel with ordered output
on stderr.

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

  time -->
  output: |---A---| |-B-| |---C---| |-D-| |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

process 1: |---A---| |-D-| |-E-|

process 2: |-B-| |---C---|

output:|---A---|B|---C---|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no
time. Once that is done, C is determined to be the visible child and
its progress will be reported in real time.

So this way of output is really good for human consumption, as it only
changes the timing, not the actual output.

For machine consumption the output needs to be prepared in the tasks,
by either having a prefix per line or per block to indicate whose tasks
output is displayed, because the output order may not follow the
original sequential ordering:

  |A| |--B--| |-C-|

will be scheduled to be all parallel:

process 1: |A|
process 2: |--B--|
process 3: |-C-|
output:|A|CB

This happens because C finished before B did, so it will be queued for
output before B.

The detection when a child has finished executing is done the same way as
two fold. First we check regularly if the stderr pipe still exists in an
interleaved manner with other actions such as checking other children
for their liveliness or starting new children. Once a child closed their
stderr stream, we assume it is stopping very soon, such that we can use
the `finish_command` code borrowed from the single external process
execution interface.


I can't quite parse the first sentence in this paragraph. Perhaps 
something like this:


To detect when a child has finished executing, we check interleaved
with other actions (such as checking the liveliness of children or
starting new processes) whether the stderr pipe still exists. Once a
child closed its stderr stream, we assume it is terminating very soon,
and use finish_command() from the single external process execution
interface to collect the exit status.



By maintaining the strong assumption of stderr being open until the
very end of a child process, we can avoid other hassle such as an
implementation using `waitpid(-1)`, which is not implemented in Windows.

Signed-off-by: Stefan Beller 


--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html


Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-09-30 Thread Junio C Hamano
Junio C Hamano  writes:

> I may have comments on other parts of this patch, but I noticed this
> a bit hard to read while reading the end result.
> ...

I finished reading the remainder.  Other than the above all look
sensible.

Will replace what had been queued.

Thanks.
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html


Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-09-30 Thread Stefan Beller
On Tue, Sep 29, 2015 at 8:12 PM, Junio C Hamano  wrote:
> Stefan Beller  writes:
>
>> + while (1) {
>> + int i;
>> + int output_timeout = 100;
>> + int spawn_cap = 4;
>> +
>> + if (!no_more_task) {
>> + for (i = 0; i < spawn_cap; i++) {
>> + int code;
>> + if (pp->nr_processes == pp->max_processes)
>> + break;
>> +
>> + code = pp_start_one(pp);
>> + if (!code)
>> + continue;
>> + if (code < 0) {
>> + pp->shutdown = 1;
>> + kill_children(pp, SIGTERM);
>> + }
>> + no_more_task = 1;
>> + break;
>> + }
>> + }
>> + if (no_more_task && !pp->nr_processes)
>> + break;
>
> I may have comments on other parts of this patch, but I noticed this
> a bit hard to read while reading the end result.
>
> Losing the outer "if (!no_more_task)" and replacing the above with
>
> for (no_more_task = 0, i = 0;
>  !no_more_task && i < spawn_cap;
>  i++) {
> ... do things that may or may not set
> ... no_more_task
> }
> if (no_more_task && ...)
> break;
>
> would make it clear that regardless of spawn_cap, no_more_task is
> honored.
>
> Also I think that having the outer "if (!no_more_task)" and not
> having "no_more_task = 0" after each iteration is buggy.  Even when
> next_task() told start_one() that it does not have more tasks for
> now, as long as there are running processes, it is entirely plausible
> that next call to next_task() can return "now we have some more task
> to do".
>
> Although I think it would make it unsightly, if you want to have the
> large indentation that protects the spawn_cap loop from getting
> entered, the condition would be
>
> if (!pp->shutdown) {
> for (... spawn_cap loop ...) {
> ...
> }
> }
>
> That structure could make sense.  But even then I would probably
> write it more like
>
> ...
> int spawn_cap = 4;
>
> pp = pp_init(...);
> while (1) {
> int no_more_task = 0;
>
> for (i = 0;
>  !no_more_task && !pp->shutdown && i < spawn_cap;
>  i++) {
> ...
> code = start_one();
> ... set no_more_task to 1 as needed
> ... set pp->shutdown to 1 as needed
> }
> if (no_more_task && !pp->nr_processes)
> break;
> buffer_stderr(...);
> output(...);
> collect(...);
> }

That is indeed better to read.
Though if we reset `no_more_task` each iteration of the loop by
having its declaration inside the loop, the condition for exiting the
loop needs to be:

if ((no_more_task || pp->shutdown) && !pp->nr_processes)
break;

for correctness.

When looking at that condition, I realized that I implicitly assumed
the workflow, where get_next_task returns 0 intermittently, to be a
second class citizen. I reworded the documentation as well there.

>
> That is, you need to have two independent conditions that tell you
> not to spawn any new task:
>
>  (1) You called start_one() repeatedly and next_task() said "nothing
>  more for now", so you know calling start_one() one more time
>  without changing other conditions (like draining output from
>  running processes and culling finished ones) will not help.
>
>  Letting other parts of the application that uses this scheduler
>  loop (i.e. drain output, cull finished process, etc.) may
>  change the situation and you _do_ need to call start_one() when
>  the next_task() merely said "nothing more for now".
>
>  That is what no_more_task controls.
>
>  (2) The application said "I want the system to be gracefully shut
>  down".  next_task() may also have said "nothing more for now"
>  and you may have set no_more_task in response to it, but unlike
>  (1) above, draining and culling must be done only to shut the
>  system down, the application does not want new processes to be
>  added.  You do not want to enter the spawn_cap loop when it
>  happens.
>
>  That is what pp->shutdown controls.
>
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html


Re: [PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-09-29 Thread Junio C Hamano
Stefan Beller  writes:

> + while (1) {
> + int i;
> + int output_timeout = 100;
> + int spawn_cap = 4;
> +
> + if (!no_more_task) {
> + for (i = 0; i < spawn_cap; i++) {
> + int code;
> + if (pp->nr_processes == pp->max_processes)
> + break;
> +
> + code = pp_start_one(pp);
> + if (!code)
> + continue;
> + if (code < 0) {
> + pp->shutdown = 1;
> + kill_children(pp, SIGTERM);
> + }
> + no_more_task = 1;
> + break;
> + }
> + }
> + if (no_more_task && !pp->nr_processes)
> + break;

I may have comments on other parts of this patch, but I noticed this
a bit hard to read while reading the end result.

Losing the outer "if (!no_more_task)" and replacing the above with

for (no_more_task = 0, i = 0;
 !no_more_task && i < spawn_cap;
 i++) {
... do things that may or may not set
... no_more_task
}
if (no_more_task && ...)
break;

would make it clear that regardless of spawn_cap, no_more_task is
honored.

Also I think that having the outer "if (!no_more_task)" and not
having "no_more_task = 0" after each iteration is buggy.  Even when
next_task() told start_one() that it does not have more tasks for
now, as long as there are running processes, it is entirely plausible
that next call to next_task() can return "now we have some more task
to do".

Although I think it would make it unsightly, if you want to have the
large indentation that protects the spawn_cap loop from getting
entered, the condition would be 

if (!pp->shutdown) {
for (... spawn_cap loop ...) {
...
}
}

That structure could make sense.  But even then I would probably
write it more like

...
int spawn_cap = 4;

pp = pp_init(...);
while (1) {
int no_more_task = 0;

for (i = 0;
 !no_more_task && !pp->shutdown && i < spawn_cap;
 i++) {
...
code = start_one();
... set no_more_task to 1 as needed
... set pp->shutdown to 1 as needed
}
if (no_more_task && !pp->nr_processes)
break;
buffer_stderr(...);
output(...);
collect(...);
}

That is, you need to have two independent conditions that tell you
not to spawn any new task:

 (1) You called start_one() repeatedly and next_task() said "nothing
 more for now", so you know calling start_one() one more time
 without changing other conditions (like draining output from
 running processes and culling finished ones) will not help.

 Letting other parts of the application that uses this scheduler
 loop (i.e. drain output, cull finished process, etc.) may
 change the situation and you _do_ need to call start_one() when
 the next_task() merely said "nothing more for now".

 That is what no_more_task controls.

 (2) The application said "I want the system to be gracefully shut
 down".  next_task() may also have said "nothing more for now"
 and you may have set no_more_task in response to it, but unlike
 (1) above, draining and culling must be done only to shut the
 system down, the application does not want new processes to be
 added.  You do not want to enter the spawn_cap loop when it
 happens.

 That is what pp->shutdown controls.

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[PATCH 6/8] run-command: add an asynchronous parallel child processor

2015-09-28 Thread Stefan Beller
This allows to run external commands in parallel with ordered output
on stderr.

If we run external commands in parallel we cannot pipe the output directly
to the our stdout/err as it would mix up. So each process's output will
flow through a pipe, which we buffer. One subprocess can be directly
piped to out stdout/err for a low latency feedback to the user.

Example:
Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a
different amount of time as the different submodules vary in size, then
the output of fetches in sequential order might look like this:

 time -->
 output: |---A---| |-B-| |---C---| |-D-| |-E-|

When we schedule these submodules into maximal two parallel processes,
a schedule and sample output over time may look like this:

process 1: |---A---| |-D-| |-E-|

process 2: |-B-| |---C---|

output:|---A---|B|---C---|DE

So A will be perceived as it would run normally in the single child
version. As B has finished by the time A is done, we can dump its whole
progress buffer on stderr, such that it looks like it finished in no
time. Once that is done, C is determined to be the visible child and
its progress will be reported in real time.

So this way of output is really good for human consumption, as it only
changes the timing, not the actual output.

For machine consumption the output needs to be prepared in the tasks,
by either having a prefix per line or per block to indicate whose tasks
output is displayed, because the output order may not follow the
original sequential ordering:

 |A| |--B--| |-C-|

will be scheduled to be all parallel:

process 1: |A|
process 2: |--B--|
process 3: |-C-|
output:|A|CB

This happens because C finished before B did, so it will be queued for
output before B.

Signed-off-by: Stefan Beller 
Signed-off-by: Junio C Hamano 
---
 run-command.c  | 348 +
 run-command.h  |  63 +
 t/t0061-run-command.sh |  20 +++
 test-run-command.c |  24 
 4 files changed, 455 insertions(+)

diff --git a/run-command.c b/run-command.c
index 28e1d55..df84985 100644
--- a/run-command.c
+++ b/run-command.c
@@ -3,6 +3,8 @@
 #include "exec_cmd.h"
 #include "sigchain.h"
 #include "argv-array.h"
+#include "thread-utils.h"
+#include "strbuf.h"
 
 void child_process_init(struct child_process *child)
 {
@@ -852,3 +854,349 @@ int capture_command(struct child_process *cmd, struct 
strbuf *buf, size_t hint)
close(cmd->out);
return finish_command(cmd);
 }
+
+struct parallel_processes {
+   void *data;
+
+   int max_processes;
+   int nr_processes;
+
+   get_next_task_fn get_next_task;
+   start_failure_fn start_failure;
+   return_value_fn return_value;
+
+   struct {
+   unsigned in_use : 1;
+   struct child_process process;
+   struct strbuf err;
+   } *children;
+   /*
+* The struct pollfd is logically part of *children,
+* but the system call expects it as its own array.
+*/
+   struct pollfd *pfd;
+
+   unsigned shutdown : 1;
+
+   int output_owner;
+   struct strbuf buffered_output; /* of finished children */
+} parallel_processes_struct;
+
+static int default_start_failure(void *data,
+struct child_process *cp,
+struct strbuf *err)
+{
+   int i;
+
+   strbuf_addstr(err, "Starting a child failed:");
+   for (i = 0; cp->argv[i]; i++)
+   strbuf_addf(err, " %s", cp->argv[i]);
+
+   return 0;
+}
+
+static int default_return_value(void *data,
+   struct child_process *cp,
+   struct strbuf *err,
+   int result)
+{
+   int i;
+
+   if (!result)
+   return 0;
+
+   strbuf_addf(err, "A child failed with return code %d:", result);
+   for (i = 0; cp->argv[i]; i++)
+   strbuf_addf(err, " %s", cp->argv[i]);
+
+   return 0;
+}
+
+static void kill_children(struct parallel_processes *pp, int signo)
+{
+   int i, n = pp->max_processes;
+
+   for (i = 0; i < n; i++)
+   if (pp->children[i].in_use)
+   kill(pp->children[i].process.pid, signo);
+}
+
+static void handle_children_on_signal(int signo)
+{
+   struct parallel_processes *pp = _processes_struct;
+
+   kill_children(pp, signo);
+   sigchain_pop(signo);
+   raise(signo);
+}
+
+static struct parallel_processes *pp_init(int n, void *data,
+ get_next_task_fn get_next_task,
+ start_failure_fn start_failure,
+ return_value_fn return_value)
+{
+   int i;
+   struct parallel_processes *pp = _processes_struct;
+
+   if (n < 1)
+   n =