In the last patch we added an easy way to get a thread pool. Now if we
want to run external commands from threads in the thread pool, the output
will mix up between the threads.

To solve this problem we protect the output via a mutex from becoming
garbled. Each thread will try to acquire and directly pipe their output
to the standard output if they are lucky to get the mutex. If they do not
have the mutex each thread will buffer their output.

Example:
Let's assume we have 5 tasks A,B,C,D,E and each task takes a different
amount of time (say `git fetch` for different submodules), then the output
of tasks in sequential order might look like this:

 time -->
 output: |---A---|   |-B-|   |----C-----------|   |-D-|   |-E-|

When we schedule these tasks into two threads, a schedule
and sample output over time may look like this:

thread 1: |---A---|   |-D-|   |-E-|

thread 2: |-B-|   |----C-----------|

output:   |---A---|B|------C-------|ED

So A will be perceived as it would run normally in
the single threaded version of foreach. As B has finished
by the time the mutex becomes available, the whole buffer
will just be dumped into the standard output. This will be
perceived by the user as just a 'very fast' operation of B.
Once that is done, C takes the mutex, and flushes the piled
up buffer to standard output. In case the it is a
git command, we have a nice progress display, which will just
look like the first half of C happend really quickly.

Notice how E and D are put out in a different order than the
original as the new parallel foreach doesn't care about the
order.

So this way of output is really good for human consumption,
as it only changes the timing, not the actual output.

For machine consumption the output needs to be prepared in
the tasks, by either having a prefix per line or per block
to indicate whose tasks output is displayed.

Signed-off-by: Stefan Beller <sbel...@google.com>
---
 run-command.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 run-command.h | 18 ++++++++++++++++++
 2 files changed, 78 insertions(+)

diff --git a/run-command.c b/run-command.c
index 3d37f8c..bac4367 100644
--- a/run-command.c
+++ b/run-command.c
@@ -556,17 +556,77 @@ int finish_command(struct child_process *cmd)
 }
 
 int run_command(struct child_process *cmd)
+#ifdef NO_PTHREADS
 {
        int code;
 
        if (cmd->out < 0 || cmd->err < 0)
                die("BUG: run_command with a pipe can cause deadlock");
 
+       if (cmd->sync_buf)
+               xwrite(cmd->out, cmd->sync_buf->buf, cmd->sync_buf->len);
+
        code = start_command(cmd);
        if (code)
                return code;
        return finish_command(cmd);
 }
+#else
+{
+       int code, lock_acquired;
+
+       if (!cmd->sync_mutex) {
+               if (cmd->out < 0 || cmd->err < 0)
+                       die("BUG: run_command with a pipe can cause deadlock");
+
+               if (cmd->sync_buf)
+                       xwrite(cmd->out, cmd->sync_buf->buf, 
cmd->sync_buf->len);
+       } else {
+               if (cmd->out < 0)
+                       die("BUG: run_command with a pipe can cause deadlock");
+
+               if (!cmd->stdout_to_stderr)
+                       die("BUG: run_command with sync_mutex not supported "
+                           "without stdout_to_stderr set");
+
+               if (!cmd->sync_buf)
+                       die("BUG: Must pass a buffer when specifying "
+                           "to sync output");
+       }
+
+       code = start_command(cmd);
+       if (code)
+               return code;
+
+       if (cmd->sync_mutex) {
+               while (1) {
+                       char buf[1024];
+                       ssize_t len = xread(cmd->err, buf, sizeof(buf));
+                       if (len < 0)
+                               die("Read from command failed");
+                       else if (len == 0)
+                               break;
+                       else
+                               strbuf_add(cmd->sync_buf, buf, len);
+
+                       if (!lock_acquired
+                           && !pthread_mutex_trylock(cmd->sync_mutex))
+                               lock_acquired = 1;
+                       if (lock_acquired) {
+                               fputs(cmd->sync_buf->buf, stderr);
+                               strbuf_reset(cmd->sync_buf);
+                       }
+               }
+               if (!lock_acquired)
+                       pthread_mutex_lock(cmd->sync_mutex);
+
+               fputs(cmd->sync_buf->buf, stderr);
+               pthread_mutex_unlock(cmd->sync_mutex);
+       }
+
+       return finish_command(cmd);
+}
+#endif
 
 int run_command_v_opt(const char **argv, int opt)
 {
diff --git a/run-command.h b/run-command.h
index 176a5b2..0df83c9 100644
--- a/run-command.h
+++ b/run-command.h
@@ -43,6 +43,24 @@ struct child_process {
        unsigned stdout_to_stderr:1;
        unsigned use_shell:1;
        unsigned clean_on_exit:1;
+#ifndef NO_PTHREAD
+       /*
+        * In a threaded environment running multiple commands from different
+        * threads would lead to garbled output as the output of different
+        * commands would mix.
+        * If this mutex is not NULL, the output of `err` is only done when
+        * holding the mutex. If the mutex cannot be acquired, the output
+        * will be buffered until the mutex can be acquired.
+        */
+       pthread_mutex_t *sync_mutex;
+#endif
+       /*
+        * The sync_buf will be used to buffer the output while the mutex
+        * is not acquired. It can also contain data before being passed into
+        * run_command, which will be output together with the output of
+        * the child.
+        */
+       struct strbuf *sync_buf;
 };
 
 #define CHILD_PROCESS_INIT { NULL, ARGV_ARRAY_INIT, ARGV_ARRAY_INIT }
-- 
2.5.0.264.g5e52b0d

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to