The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/2727

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===

From 5c90a418798d42bf9417ad6ef237b2725a20a084 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brau...@ubuntu.com>
Date: Thu, 15 Dec 2016 23:55:23 +0100
Subject: [PATCH 1/5] exec: Exec() return attached PID && take bool arg

Give Exec() an explicit boolean argument "wait" which the user can use to tell
the function whether he wants it to wait on the command or not and return the
attached PID as an additional return value.
It's the callers responsibility to wait on the command. (Note. The PID returned
by Exec() can not be waited upon since it's a child of the lxd forkexec command.
It can however be used to e.g. forward signals.)

Signed-off-by: Christian Brauner <christian.brau...@ubuntu.com>
---
 lxd/container.go      | 15 ++++++++--
 lxd/container_exec.go | 81 ++++++++++++++++++++++++++++++++++++++++-----------
 lxd/container_lxc.go  | 39 ++++++++++++++++++-------
 lxd/main_forkexec.go  | 41 ++++++++++++++++++++++++--
 4 files changed, 145 insertions(+), 31 deletions(-)

diff --git a/lxd/container.go b/lxd/container.go
index 995140a..8d4a9af 100644
--- a/lxd/container.go
+++ b/lxd/container.go
@@ -442,8 +442,19 @@ type container interface {
        FilePush(srcpath string, dstpath string, uid int, gid int, mode int) 
error
        FileRemove(path string) error
 
-       // Command execution
-       Exec(command []string, env map[string]string, stdin *os.File, stdout 
*os.File, stderr *os.File) (int, error)
+       /* Command execution:
+                * 1. passing in false for wait
+                *    - equivalent to calling cmd.Run()
+                * 2. passing in true for wait
+                *    - start the command and return its PID in the first return
+                *      argument and the PID of the attached process in the 
second
+                *      argument. It's the callers responsibility to wait on the
+                *      command. (Note. The returned PID of the attached 
process can not
+                *      be waited upon since it's a child of the lxd forkexec 
command
+                *      (the PID returned in the first return argument). It can 
however
+                *      be used to e.g. forward signals.)
+       */
+       Exec(command []string, env map[string]string, stdin *os.File, stdout 
*os.File, stderr *os.File, wait bool) (int, int, error)
 
        // Status
        Render() (interface{}, error)
diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index ba7c7aa..0b3e440 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -9,6 +9,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "syscall"
 
        "github.com/gorilla/mux"
        "github.com/gorilla/websocket"
@@ -137,11 +138,14 @@ func (s *execWs) Do(op *operation) error {
        }
 
        controlExit := make(chan bool)
+       attachedChildIsBorn := make(chan int)
+       attachedChildIsDead := make(chan bool, 1)
        var wgEOF sync.WaitGroup
 
        if s.interactive {
                wgEOF.Add(1)
                go func() {
+                       <-attachedChildIsBorn
                        select {
                        case <-s.controlConnected:
                                break
@@ -195,6 +199,7 @@ func (s *execWs) Do(op *operation) error {
                                }
                        }
                }()
+
                go func() {
                        readDone, writeDone := 
shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
                        <-readDone
@@ -202,6 +207,7 @@ func (s *execWs) Do(op *operation) error {
                        s.conns[0].Close()
                        wgEOF.Done()
                }()
+
        } else {
                wgEOF.Add(len(ttys) - 1)
                for i := 0; i < len(ttys); i++ {
@@ -218,33 +224,74 @@ func (s *execWs) Do(op *operation) error {
                }
        }
 
-       cmdResult, cmdErr := s.container.Exec(s.command, s.env, stdin, stdout, 
stderr)
+       finisher := func(cmdResult int, cmdErr error) error {
+               for _, tty := range ttys {
+                       tty.Close()
+               }
 
-       for _, tty := range ttys {
-               tty.Close()
-       }
+               if s.conns[-1] == nil {
+                       if s.interactive {
+                               controlExit <- true
+                       }
+               } else {
+                       s.conns[-1].Close()
+               }
+
+               attachedChildIsDead <- true
 
-       if s.conns[-1] == nil {
-               if s.interactive {
-                       controlExit <- true
+               wgEOF.Wait()
+
+               for _, pty := range ptys {
+                       pty.Close()
                }
-       } else {
-               s.conns[-1].Close()
-       }
 
-       wgEOF.Wait()
+               metadata := shared.Jmap{"return": cmdResult}
+               err = op.UpdateMetadata(metadata)
+               if err != nil {
+                       return err
+               }
 
-       for _, pty := range ptys {
-               pty.Close()
+               return cmdErr
        }
 
-       metadata := shared.Jmap{"return": cmdResult}
-       err = op.UpdateMetadata(metadata)
+       pid, attachedPid, err := s.container.Exec(s.command, s.env, stdin, 
stdout, stderr, false)
        if err != nil {
                return err
        }
 
-       return cmdErr
+       if s.interactive {
+               attachedChildIsBorn <- attachedPid
+       }
+
+       proc, err := os.FindProcess(pid)
+       if err != nil {
+               return finisher(-1, fmt.Errorf("Failed finding process: %q", 
err))
+       }
+
+       procState, err := proc.Wait()
+       if err != nil {
+               return finisher(-1, fmt.Errorf("Failed waiting on process %d: 
%q", pid, err))
+       }
+
+       if procState.Success() {
+               return finisher(0, nil)
+       }
+
+       status, ok := procState.Sys().(syscall.WaitStatus)
+       if ok {
+               if status.Exited() {
+                       return finisher(status.ExitStatus(), nil)
+               }
+               // Backwards compatible behavior. Report success when we exited
+               // due to a signal. Otherwise this may break Jenkins, e.g. when
+               // lxc exec foo reboot receives SIGTERM and status.Exitstats()
+               // would report -1.
+               if status.Signaled() {
+                       return finisher(0, nil)
+               }
+       }
+
+       return finisher(-1, nil)
 }
 
 func containerExecPost(d *Daemon, r *http.Request) Response {
@@ -337,7 +384,7 @@ func containerExecPost(d *Daemon, r *http.Request) Response 
{
        }
 
        run := func(op *operation) error {
-               cmdResult, cmdErr := c.Exec(post.Command, env, nil, nil, nil)
+               cmdResult, _, cmdErr := c.Exec(post.Command, env, nil, nil, 
nil, true)
                metadata := shared.Jmap{"return": cmdResult}
                err = op.UpdateMetadata(metadata)
                if err != nil {
diff --git a/lxd/container_lxc.go b/lxd/container_lxc.go
index a07d900..fcbc20a 100644
--- a/lxd/container_lxc.go
+++ b/lxd/container_lxc.go
@@ -4007,7 +4007,7 @@ func (c *containerLXC) FileRemove(path string) error {
        return nil
 }
 
-func (c *containerLXC) Exec(command []string, env map[string]string, stdin 
*os.File, stdout *os.File, stderr *os.File) (int, error) {
+func (c *containerLXC) Exec(command []string, env map[string]string, stdin 
*os.File, stdout *os.File, stderr *os.File, wait bool) (int, int, error) {
        envSlice := []string{}
 
        for k, v := range env {
@@ -4031,25 +4031,44 @@ func (c *containerLXC) Exec(command []string, env 
map[string]string, stdin *os.F
        cmd.Stdout = stdout
        cmd.Stderr = stderr
 
-       shared.LogInfo("Executing command", log.Ctx{"environment": envSlice, 
"args": args})
+       r, w, err := shared.Pipe()
+       defer r.Close()
+       if err != nil {
+               shared.LogErrorf("s", err)
+               return -1, -1, err
+       }
 
-       err := cmd.Run()
+       cmd.ExtraFiles = []*os.File{w}
+       err = cmd.Start()
+       if err != nil {
+               w.Close()
+               return -1, -1, err
+       }
+       w.Close()
+       attachedPid := -1
+       if err := json.NewDecoder(r).Decode(&attachedPid); err != nil {
+               shared.LogErrorf("Failed to retrieve PID of executing child 
process: %s", err)
+               return -1, -1, err
+       }
+
+       // It's the callers responsibility to wait or not wait.
+       if !wait {
+               return cmd.Process.Pid, attachedPid, nil
+       }
+
+       err = cmd.Wait()
        if err != nil {
                exitErr, ok := err.(*exec.ExitError)
                if ok {
                        status, ok := exitErr.Sys().(syscall.WaitStatus)
                        if ok {
-                               shared.LogInfo("Executed command", 
log.Ctx{"environment": envSlice, "args": args, "exit_status": 
status.ExitStatus()})
-                               return status.ExitStatus(), nil
+                               return status.ExitStatus(), attachedPid, nil
                        }
                }
-
-               shared.LogInfo("Failed executing command", 
log.Ctx{"environment": envSlice, "args": args, "err": err})
-               return -1, err
+               return -1, -1, err
        }
 
-       shared.LogInfo("Executed command", log.Ctx{"environment": envSlice, 
"args": args})
-       return 0, nil
+       return 0, attachedPid, nil
 }
 
 func (c *containerLXC) diskState() map[string]shared.ContainerStateDisk {
diff --git a/lxd/main_forkexec.go b/lxd/main_forkexec.go
index e3ebcc6..8ebb0a6 100644
--- a/lxd/main_forkexec.go
+++ b/lxd/main_forkexec.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "encoding/json"
        "fmt"
        "os"
        "strings"
@@ -89,10 +90,46 @@ func cmdForkExec(args []string) (int, error) {
 
        opts.Env = env
 
-       status, err := c.RunCommandStatus(cmd, opts)
+       status, err := c.RunCommandNoWait(cmd, opts)
        if err != nil {
                return -1, fmt.Errorf("Failed running command: %q", err)
        }
+       // Send the PID of the executing process.
+       w := os.NewFile(uintptr(3), "attachedPid")
+       defer w.Close()
 
-       return status >> 8, nil
+       err = json.NewEncoder(w).Encode(status)
+       if err != nil {
+               return -1, fmt.Errorf("Failed sending PID of executing command: 
%q", err)
+       }
+
+       proc, err := os.FindProcess(status)
+       if err != nil {
+               return -1, fmt.Errorf("Failed finding process: %q", err)
+       }
+
+       procState, err := proc.Wait()
+       if err != nil {
+               return -1, fmt.Errorf("Failed waiting on process %d: %q", 
status, err)
+       }
+
+       if procState.Success() {
+               return 0, nil
+       }
+
+       exCode, ok := procState.Sys().(syscall.WaitStatus)
+       if ok {
+               if exCode.Exited() {
+                       return exCode.ExitStatus(), nil
+               }
+               // Backwards compatible behavior. Report success when we exited
+               // due to a signal. Otherwise this may break Jenkins, e.g. when
+               // lxc exec foo reboot receives SIGTERM and exCode.Exitstats()
+               // would report -1.
+               if exCode.Signaled() {
+                       return 0, nil
+               }
+       }
+
+       return -1, fmt.Errorf("Command failed")
 }

From aef1c5a6962d662278b445c2a35d459537946be9 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brau...@ubuntu.com>
Date: Thu, 8 Dec 2016 14:34:50 +0100
Subject: [PATCH 2/5] shared/util_linux: add GetPollRevents()

It's a wrapper around a C function that polls on a single file descriptor and
returns the status and the detected revents.

Signed-off-by: Christian Brauner <christian.brau...@ubuntu.com>
---
 shared/util_linux.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 51e0e43..0f83573 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -25,6 +25,7 @@ import (
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
+#include <poll.h>
 #include <string.h>
 #include <stdio.h>
 
@@ -181,9 +182,50 @@ int shiftowner(char *basepath, char *path, int uid, int 
gid) {
        close(fd);
        return 0;
 }
+
+int get_poll_revents(int lfd, int timeout, int flags, int *revents, int 
*saved_errno)
+{
+       int ret;
+       struct pollfd pfd = {lfd, flags, 0};
+
+again:
+       ret = poll(&pfd, 1, timeout);
+       if (ret < 0) {
+               if (errno == EINTR)
+                       goto again;
+
+               *saved_errno = errno;
+               fprintf(stderr, "Failed to poll() on file descriptor.\n");
+               return -1;
+       }
+
+       *revents = pfd.revents;
+
+       return ret;
+}
 */
 import "C"
 
+const POLLIN int = C.POLLIN
+const POLLPRI int = C.POLLPRI
+const POLLNVAL int = C.POLLNVAL
+const POLLERR int = C.POLLERR
+const POLLHUP int = C.POLLHUP
+const POLLRDHUP int = C.POLLRDHUP
+
+func GetPollRevents(fd int, timeout int, flags int) (int, int, error) {
+       var err error
+       revents := C.int(0)
+       saved_errno := C.int(0)
+
+       ret := C.get_poll_revents(C.int(fd), C.int(timeout), C.int(flags), 
&revents, &saved_errno)
+       if int(ret) < 0 {
+               err = syscall.Errno(saved_errno)
+       }
+
+       return int(ret), int(revents), err
+}
+
 func ShiftOwner(basepath string, path string, uid int, gid int) error {
        cbasepath := C.CString(basepath)
        defer C.free(unsafe.Pointer(cbasepath))

From b1b32b01e46913b0f27fef274c57c66e81201446 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brau...@ubuntu.com>
Date: Sat, 10 Dec 2016 12:41:13 +0100
Subject: [PATCH 3/5] exec: detect background tasks to allow clean exit

This adds the function ExecReaderToChannel() to our linux specific utility
functions. It is the workhorse that reads from the master side of a pty file
descriptor. Background tasks are identified correctly. That means you can run:

        chb@conventiont|~/source/go/bin
        > ./lxc exec xen1 -- bash
        root@xen1:~# sleep infinity &

or

        chb@conventiont|~/source/go/bin
        > ./lxc exec xen1 -- bash
        root@xen1:~# yes &
        [1] 290
        root@xen1:~# y
        root@xen1:~# y
        root@xen1:~# y
        .
        .
        .

and still cleanly exit via "Ctrl+D" or "exit". The function is explained in
detail directly in the code.

Signed-off-by: Christian Brauner <christian.brau...@ubuntu.com>
---
 shared/util_linux.go | 181 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 181 insertions(+)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 0f83573..77e4b23 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -6,9 +6,12 @@ package shared
 import (
        "errors"
        "fmt"
+       "io"
        "os"
        "os/exec"
        "strings"
+       "sync"
+       "sync/atomic"
        "syscall"
        "unsafe"
 )
@@ -524,3 +527,181 @@ func GetAllXattr(path string) (xattrs map[string]string, 
err error) {
 
        return xattrs, nil
 }
+
+// Extensively commented directly in the code. Please leave the comments!
+// Looking at this in a couple of months noone will know why and how this works
+// anymore.
+func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan bool, fd 
int) <-chan []byte {
+       if bufferSize <= (128 * 1024) {
+               bufferSize = (128 * 1024)
+       }
+
+       ch := make(chan ([]byte))
+       mutex := &sync.Mutex{}
+       chanClosed := false
+       onReturn := func() {
+               mutex.Lock()
+               if !chanClosed {
+                       chanClosed = true
+                       close(ch)
+               }
+               mutex.Unlock()
+       }
+
+       // COMMENT(brauner):
+       // [1]: This function has just one job: Dealing with the case where we
+       // are running an interactive shell session where we put a process in
+       // the background that does hold stdin/stdout open, but does not
+       // generate any output at all. This case cannot be dealt with in the
+       // following function call. Here's why: Assume the above case, now the
+       // attached child (the shell in this example) exits. This will not
+       // generate any poll() event: We won't get POLLHUP because the
+       // background process is holding stdin/stdout open and noone is writing
+       // to it. So we effectively block on GetPollRevents() in the function
+       // below. Hence, we use another go routine here who's only job is to
+       // handle that case: When we detect that the child has exited we check
+       // whether a POLLIN or POLLHUP event has been generated. If not, we know
+       // that there's nothing buffered on stdout and exit.
+       var attachedChildIsDead int32 = 0
+       go func() {
+               <-exited
+
+               atomic.StoreInt32(&attachedChildIsDead, 1)
+
+               ret, revents, err := GetPollRevents(fd, 0, (POLLIN | POLLPRI | 
POLLERR | POLLHUP | POLLRDHUP))
+               if ret < 0 {
+                       LogErrorf("Failed to poll(POLLIN | POLLPRI | POLLHUP | 
POLLRDHUP) on file descriptor: %s.", err)
+               } else if ret > 0 {
+                       if (revents & POLLERR) > 0 {
+                               LogWarnf("Detected poll(POLLERR) event.")
+                       }
+               } else if ret == 0 {
+                       LogDebugf("No data in stdout: exiting.")
+                       onReturn()
+                       return
+               }
+       }()
+
+       go func() {
+               readSize := (128 * 1024)
+               offset := 0
+               buf := make([]byte, bufferSize)
+               avoidAtomicLoad := false
+
+               defer onReturn()
+               for {
+                       nr := 0
+                       var err error
+
+                       ret, revents, err := GetPollRevents(fd, -1, (POLLIN | 
POLLPRI | POLLERR | POLLHUP | POLLRDHUP))
+                       if ret < 0 {
+                               // COMMENT(brauner):
+                               // This condition is only reached in cases 
where we are massively f*cked since we even handle
+                               // EINTR in the underlying C wrapper around 
poll(). So let's exit here.
+                               LogErrorf("Failed to poll(POLLIN | POLLPRI | 
POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err)
+                               return
+                       }
+
+                       // COMMENT(brauner):
+                       // [2]: If the process exits before all its data has 
been read by us and no other process holds stdin or
+                       // stdout open, then we will observe a (POLLHUP | 
POLLRDHUP | POLLIN) event. This means, we need to
+                       // keep on reading from the pty file descriptor until 
we get a simple POLLHUP back.
+                       both := ((revents & (POLLIN | POLLPRI)) > 0) && 
((revents & (POLLHUP | POLLRDHUP)) > 0)
+                       if both {
+                               LogDebugf("Detected poll(POLLIN | POLLPRI | 
POLLHUP | POLLRDHUP) event.")
+                               read := buf[offset : offset+readSize]
+                               nr, err = r.Read(read)
+                       }
+
+                       if (revents & POLLERR) > 0 {
+                               LogWarnf("Detected poll(POLLERR) event: 
exiting.")
+                               return
+                       }
+
+                       if ((revents & (POLLIN | POLLPRI)) > 0) && !both {
+                               // COMMENT(brauner):
+                               // This might appear unintuitive at first but 
is actually a nice trick: Assume we are running
+                               // a shell session in a container and put a 
process in the background that is writing to
+                               // stdout. Now assume the attached process (aka 
the shell in this example) exits because we
+                               // used Ctrl+D to send EOF or something. If no 
other process would be holding stdout open we
+                               // would expect to observe either a (POLLHUP | 
POLLRDHUP | POLLIN | POLLPRI) event if there
+                               // is still data buffered from the previous 
process or a simple (POLLHUP | POLLRDHUP) if
+                               // no data is buffered. The fact that we only 
observe a (POLLIN | POLLPRI) event means that
+                               // another process is holding stdout open and 
is writing to it.
+                               // One counter argument that can be leveraged 
is (brauner looks at tycho :))
+                               // "Hey, you need to write at least one 
additional tty buffer to make sure that
+                               // everything that the attached child has 
written is actually shown."
+                               // The answer to that is:
+                               // "This case can only happen if the process 
has exited and has left data in stdout which
+                               // would generate a (POLLIN | POLLPRI | POLLHUP 
| POLLRDHUP) event and this case is already
+                               // handled and triggers another codepath. (See 
[2].)"
+                               if avoidAtomicLoad || 
atomic.LoadInt32(&attachedChildIsDead) == 1 {
+                                       avoidAtomicLoad = true
+                                       // COMMENT(brauner):
+                                       // Handle race between 
atomic.StorInt32() in the go routine
+                                       // explained in [1] and 
atomic.LoadInt32() in the go routine
+                                       // here:
+                                       // We need to check for (POLLHUP | 
POLLRDHUP) here again since we might
+                                       // still be handling a pure POLLIN 
event from a write prior to the childs
+                                       // exit. But the child might have 
exited right before and performed
+                                       // atomic.StoreInt32() to update 
attachedChildIsDead before we
+                                       // performed our atomic.LoadInt32(). 
This means we accidently hit this
+                                       // codepath and are misinformed about 
the available poll() events. So we
+                                       // need to perform a non-blocking 
poll() again to exclude that case:
+                                       //
+                                       // - If we detect no (POLLHUP | 
POLLRDHUP) event we know the child
+                                       //   has already exited but someone 
else is holding stdin/stdout open and
+                                       //   writing to it.
+                                       //   Note that his case should only 
ever be triggered in situations like
+                                       //   running a shell and doing stuff 
like:
+                                       //    > ./lxc exec xen1 -- bash
+                                       //   root@xen1:~# yes &
+                                       //   .
+                                       //   .
+                                       //   .
+                                       //   now send Ctrl+D or type "exit". By 
the time the Ctrl+D/exit event is
+                                       //   triggered, we will have read all 
of the childs data it has written to
+                                       //   stdout and so we can assume that 
anything that comes now belongs to
+                                       //   the process that is holding 
stdin/stdout open.
+                                       //
+                                       // - If we detect a (POLLHUP | 
POLLRDHUP) event we know that we've
+                                       //   hit this codepath on accident 
caused by the race between
+                                       //   atomic.StoreInt32() in the go 
routine explained in [1] and
+                                       //   atomic.LoadInt32() in this go 
routine. So the next call to
+                                       //   GetPollRevents() will either return
+                                       //   (POLLIN | POLLPRI | POLLERR | 
POLLHUP | POLLRDHUP)
+                                       //   or (POLLHUP | POLLRDHUP). Both 
will trigger another codepath (See [2].)
+                                       //   that takes care that all data of 
the child that is buffered in
+                                       //   stdout is written out.
+                                       ret, revents, err := GetPollRevents(fd, 
0, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP))
+                                       if ret < 0 {
+                                               LogErrorf("Failed to 
poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. 
Exiting.", err)
+                                               return
+                                       } else if (revents & (POLLHUP | 
POLLRDHUP)) == 0 {
+                                               LogDebugf("Exiting but 
background processes are still running.")
+                                               return
+                                       }
+                               }
+                               read := buf[offset : offset+readSize]
+                               nr, err = r.Read(read)
+                       }
+
+                       // COMMENT(brauner):
+                       // The attached process has exited and we have read all 
data that may have
+                       // been buffered.
+                       if ((revents & (POLLHUP | POLLRDHUP)) > 0) && !both {
+                               LogDebugf("Detected poll(POLLHUP) event: 
exiting.")
+                               return
+                       }
+
+                       offset += nr
+                       if offset > 0 && (offset+readSize >= bufferSize || err 
!= nil) {
+                               ch <- buf[0:offset]
+                               offset = 0
+                               buf = make([]byte, bufferSize)
+                       }
+               }
+       }()
+
+       return ch
+}

From bf619338e221a3b58f1704003db7f21ebd1e9729 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brau...@ubuntu.com>
Date: Thu, 8 Dec 2016 14:46:10 +0100
Subject: [PATCH 4/5] shared/network_linux: ad WebsocketExecMirror()

This function specifically deals with running commands attached to a pty in the
container. We need to put it in a separate file as it is Linux specific.

Signed-off-by: Christian Brauner <christian.brau...@ubuntu.com>
---
 lxd/container_exec.go   |   2 +-
 lxd/rsync.go            |   4 +-
 shared/network.go       | 153 +++++++++++++++++++++++++++---------------------
 shared/network_linux.go |  48 +++++++++++++++
 4 files changed, 136 insertions(+), 71 deletions(-)
 create mode 100644 shared/network_linux.go

diff --git a/lxd/container_exec.go b/lxd/container_exec.go
index 0b3e440..6101757 100644
--- a/lxd/container_exec.go
+++ b/lxd/container_exec.go
@@ -201,7 +201,7 @@ func (s *execWs) Do(op *operation) error {
                }()
 
                go func() {
-                       readDone, writeDone := 
shared.WebsocketMirror(s.conns[0], ptys[0], ptys[0])
+                       readDone, writeDone := 
shared.WebsocketExecMirror(s.conns[0], ptys[0], ptys[0], attachedChildIsDead, 
int(ptys[0].Fd()))
                        <-readDone
                        <-writeDone
                        s.conns[0].Close()
diff --git a/lxd/rsync.go b/lxd/rsync.go
index ee58fee..ba646ee 100644
--- a/lxd/rsync.go
+++ b/lxd/rsync.go
@@ -96,7 +96,7 @@ func RsyncSend(path string, conn *websocket.Conn) error {
                return err
        }
 
-       readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, 
dataSocket)
+       readDone, writeDone := shared.WebsocketMirror(conn, dataSocket, 
dataSocket, nil, nil)
 
        output, err := ioutil.ReadAll(stderr)
        if err != nil {
@@ -146,7 +146,7 @@ func RsyncRecv(path string, conn *websocket.Conn) error {
                return err
        }
 
-       readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout)
+       readDone, writeDone := shared.WebsocketMirror(conn, stdin, stdout, nil, 
nil)
        data, err2 := ioutil.ReadAll(stderr)
        if err2 != nil {
                shared.LogDebugf("error reading rsync stderr: %s", err2)
diff --git a/shared/network.go b/shared/network.go
index 39a7f58..20b14c0 100644
--- a/shared/network.go
+++ b/shared/network.go
@@ -204,84 +204,101 @@ func WebsocketRecvStream(w io.Writer, conn 
*websocket.Conn) chan bool {
        return ch
 }
 
+func defaultReader(conn *websocket.Conn, r io.ReadCloser, readDone chan<- 
bool) {
+       /* For now, we don't need to adjust buffer sizes in
+       * WebsocketMirror, since it's used for interactive things like
+       * exec.
+        */
+       in := ReaderToChannel(r, -1)
+       for {
+               buf, ok := <-in
+               if !ok {
+                       r.Close()
+                       LogDebugf("sending write barrier")
+                       conn.WriteMessage(websocket.TextMessage, []byte{})
+                       readDone <- true
+                       return
+               }
+               w, err := conn.NextWriter(websocket.BinaryMessage)
+               if err != nil {
+                       LogDebugf("Got error getting next writer %s", err)
+                       break
+               }
+
+               _, err = w.Write(buf)
+               w.Close()
+               if err != nil {
+                       LogDebugf("Got err writing %s", err)
+                       break
+               }
+       }
+       closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, 
"")
+       conn.WriteMessage(websocket.CloseMessage, closeMsg)
+       readDone <- true
+       r.Close()
+}
+
+func defaultWriter(conn *websocket.Conn, w io.WriteCloser, writeDone chan<- 
bool) {
+       for {
+               mt, r, err := conn.NextReader()
+               if err != nil {
+                       LogDebugf("Got error getting next reader %s, %s", err, 
w)
+                       break
+               }
+
+               if mt == websocket.CloseMessage {
+                       LogDebugf("Got close message for reader")
+                       break
+               }
+
+               if mt == websocket.TextMessage {
+                       LogDebugf("Got message barrier, resetting stream")
+                       break
+               }
+
+               buf, err := ioutil.ReadAll(r)
+               if err != nil {
+                       LogDebugf("Got error writing to writer %s", err)
+                       break
+               }
+               i, err := w.Write(buf)
+               if i != len(buf) {
+                       LogDebugf("Didn't write all of buf")
+                       break
+               }
+               if err != nil {
+                       LogDebugf("Error writing buf %s", err)
+                       break
+               }
+       }
+       writeDone <- true
+       w.Close()
+}
+
 // WebsocketMirror allows mirroring a reader to a websocket and taking the
 // result and writing it to a writer. This function allows for multiple
 // mirrorings and correctly negotiates stream endings. However, it means any
 // websocket.Conns passed to it are live when it returns, and must be closed
 // explicitly.
-func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser) 
(chan bool, chan bool) {
+type WebSocketMirrorReader func(conn *websocket.Conn, r io.ReadCloser, 
readDone chan<- bool)
+type WebSocketMirrorWriter func(conn *websocket.Conn, w io.WriteCloser, 
writeDone chan<- bool)
+
+func WebsocketMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, 
Reader WebSocketMirrorReader, Writer WebSocketMirrorWriter) (chan bool, chan 
bool) {
        readDone := make(chan bool, 1)
        writeDone := make(chan bool, 1)
-       go func(conn *websocket.Conn, w io.WriteCloser) {
-               for {
-                       mt, r, err := conn.NextReader()
-                       if err != nil {
-                               LogDebugf("Got error getting next reader %s, 
%s", err, w)
-                               break
-                       }
 
-                       if mt == websocket.CloseMessage {
-                               LogDebugf("Got close message for reader")
-                               break
-                       }
-
-                       if mt == websocket.TextMessage {
-                               LogDebugf("Got message barrier, resetting 
stream")
-                               break
-                       }
+       ReadFunc := Reader
+       if ReadFunc == nil {
+               ReadFunc = defaultReader
+       }
 
-                       buf, err := ioutil.ReadAll(r)
-                       if err != nil {
-                               LogDebugf("Got error writing to writer %s", err)
-                               break
-                       }
-                       i, err := w.Write(buf)
-                       if i != len(buf) {
-                               LogDebugf("Didn't write all of buf")
-                               break
-                       }
-                       if err != nil {
-                               LogDebugf("Error writing buf %s", err)
-                               break
-                       }
-               }
-               writeDone <- true
-               w.Close()
-       }(conn, w)
-
-       go func(conn *websocket.Conn, r io.ReadCloser) {
-               /* For now, we don't need to adjust buffer sizes in
-                * WebsocketMirror, since it's used for interactive things like
-                * exec.
-                */
-               in := ReaderToChannel(r, -1)
-               for {
-                       buf, ok := <-in
-                       if !ok {
-                               r.Close()
-                               LogDebugf("sending write barrier")
-                               conn.WriteMessage(websocket.TextMessage, 
[]byte{})
-                               readDone <- true
-                               return
-                       }
-                       w, err := conn.NextWriter(websocket.BinaryMessage)
-                       if err != nil {
-                               LogDebugf("Got error getting next writer %s", 
err)
-                               break
-                       }
+       WriteFunc := Writer
+       if WriteFunc == nil {
+               WriteFunc = defaultWriter
+       }
 
-                       _, err = w.Write(buf)
-                       w.Close()
-                       if err != nil {
-                               LogDebugf("Got err writing %s", err)
-                               break
-                       }
-               }
-               closeMsg := 
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
-               conn.WriteMessage(websocket.CloseMessage, closeMsg)
-               readDone <- true
-               r.Close()
-       }(conn, r)
+       go ReadFunc(conn, r, readDone)
+       go WriteFunc(conn, w, writeDone)
 
        return readDone, writeDone
 }
diff --git a/shared/network_linux.go b/shared/network_linux.go
new file mode 100644
index 0000000..3bee792
--- /dev/null
+++ b/shared/network_linux.go
@@ -0,0 +1,48 @@
+// +build linux
+
+package shared
+
+import (
+       "io"
+
+       "github.com/gorilla/websocket"
+)
+
+func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r 
io.ReadCloser, exited chan bool, fd int) (chan bool, chan bool) {
+       readDone := make(chan bool, 1)
+       writeDone := make(chan bool, 1)
+
+       go defaultWriter(conn, w, writeDone)
+
+       go func(conn *websocket.Conn, r io.ReadCloser) {
+               in := ExecReaderToChannel(r, -1, exited, fd)
+               for {
+                       buf, ok := <-in
+                       if !ok {
+                               r.Close()
+                               LogDebugf("sending write barrier")
+                               conn.WriteMessage(websocket.TextMessage, 
[]byte{})
+                               readDone <- true
+                               return
+                       }
+                       w, err := conn.NextWriter(websocket.BinaryMessage)
+                       if err != nil {
+                               LogDebugf("Got error getting next writer %s", 
err)
+                               break
+                       }
+
+                       _, err = w.Write(buf)
+                       w.Close()
+                       if err != nil {
+                               LogDebugf("Got err writing %s", err)
+                               break
+                       }
+               }
+               closeMsg := 
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
+               conn.WriteMessage(websocket.CloseMessage, closeMsg)
+               readDone <- true
+               r.Close()
+       }(conn, r)
+
+       return readDone, writeDone
+}

From 91b39c3ec77ebbb4bc2c7ac4d33de5062838e637 Mon Sep 17 00:00:00 2001
From: Christian Brauner <christian.brau...@ubuntu.com>
Date: Tue, 13 Dec 2016 02:35:23 +0100
Subject: [PATCH 5/5] util_linux: ExecReaderToChannel() use sync.Once

Takes care that the closeChannel() function is exactly executed once. This
allows us to avoid using a mutex.

Signed-off-by: Christian Brauner <christian.brau...@ubuntu.com>
---
 shared/util_linux.go | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/shared/util_linux.go b/shared/util_linux.go
index 77e4b23..8ef5b82 100644
--- a/shared/util_linux.go
+++ b/shared/util_linux.go
@@ -537,15 +537,12 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, 
exited <-chan bool, fd int
        }
 
        ch := make(chan ([]byte))
-       mutex := &sync.Mutex{}
-       chanClosed := false
-       onReturn := func() {
-               mutex.Lock()
-               if !chanClosed {
-                       chanClosed = true
-                       close(ch)
-               }
-               mutex.Unlock()
+
+       // Takes care that the closeChannel() function is exactly executed once.
+       // This allows us to avoid using a mutex.
+       var once sync.Once
+       closeChannel := func() {
+               close(ch)
        }
 
        // COMMENT(brauner):
@@ -577,7 +574,7 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, 
exited <-chan bool, fd int
                        }
                } else if ret == 0 {
                        LogDebugf("No data in stdout: exiting.")
-                       onReturn()
+                       once.Do(closeChannel)
                        return
                }
        }()
@@ -588,7 +585,7 @@ func ExecReaderToChannel(r io.Reader, bufferSize int, 
exited <-chan bool, fd int
                buf := make([]byte, bufferSize)
                avoidAtomicLoad := false
 
-               defer onReturn()
+               defer once.Do(closeChannel)
                for {
                        nr := 0
                        var err error
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to