The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7391
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) ===
From 7f4ad6aef20938900567faf1e504a9348d1852b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Tue, 19 May 2020 17:12:28 -0400 Subject: [PATCH 1/2] shared: Reimplement GetPollRevents without cgo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- shared/netutils/network_linux.go | 202 ----------------------- shared/netutils/network_linux_cgo.go | 210 ++++++++++++++++++++++++ shared/util_linux.go | 208 ++++++++++++++++++++++++ shared/util_linux_cgo.go | 231 --------------------------- 4 files changed, 418 insertions(+), 433 deletions(-) create mode 100644 shared/netutils/network_linux_cgo.go diff --git a/shared/netutils/network_linux.go b/shared/netutils/network_linux.go index 2f31ef70e6..c006e7f388 100644 --- a/shared/netutils/network_linux.go +++ b/shared/netutils/network_linux.go @@ -1,177 +1,16 @@ // +build linux -// +build cgo package netutils import ( - "fmt" "io" - "net" - "os" - "strings" - "unsafe" "github.com/gorilla/websocket" "github.com/lxc/lxd/shared" - "github.com/lxc/lxd/shared/api" "github.com/lxc/lxd/shared/logger" ) -/* -#include "unixfd.h" -#include "netns_getifaddrs.c" -*/ -import "C" - -// NetnsGetifaddrs returns a map of InstanceStateNetwork for a particular process. -func NetnsGetifaddrs(initPID int32) (map[string]api.InstanceStateNetwork, error) { - var netnsidAware C.bool - var ifaddrs *C.struct_netns_ifaddrs - var netnsID C.__s32 - - if initPID > 0 { - f, err := os.Open(fmt.Sprintf("/proc/%d/ns/net", initPID)) - if err != nil { - return nil, err - } - defer f.Close() - - netnsID = C.netns_get_nsid(C.__s32(f.Fd())) - if netnsID < 0 { - return nil, fmt.Errorf("Failed to retrieve network namespace id") - } - } else { - netnsID = -1 - } - - ret := C.netns_getifaddrs(&ifaddrs, netnsID, &netnsidAware) - if ret < 0 { - return nil, fmt.Errorf("Failed to retrieve network interfaces and addresses") - } - defer C.netns_freeifaddrs(ifaddrs) - - if netnsID >= 0 && !netnsidAware { - return nil, fmt.Errorf("Netlink requests are not fully network namespace id aware") - } - - // We're using the interface name as key here but we should really - // switch to the ifindex at some point to handle ip aliasing correctly. - networks := map[string]api.InstanceStateNetwork{} - - for addr := ifaddrs; addr != nil; addr = addr.ifa_next { - var address [C.INET6_ADDRSTRLEN]C.char - addNetwork, networkExists := networks[C.GoString(addr.ifa_name)] - if !networkExists { - addNetwork = api.InstanceStateNetwork{ - Addresses: []api.InstanceStateNetworkAddress{}, - Counters: api.InstanceStateNetworkCounters{}, - } - } - - // Interface flags - netState := "down" - netType := "unknown" - - if (addr.ifa_flags & C.IFF_BROADCAST) > 0 { - netType = "broadcast" - } - - if (addr.ifa_flags & C.IFF_LOOPBACK) > 0 { - netType = "loopback" - } - - if (addr.ifa_flags & C.IFF_POINTOPOINT) > 0 { - netType = "point-to-point" - } - - if (addr.ifa_flags & C.IFF_UP) > 0 { - netState = "up" - } - addNetwork.State = netState - addNetwork.Type = netType - addNetwork.Mtu = int(addr.ifa_mtu) - - if initPID != 0 && int(addr.ifa_ifindex_peer) > 0 { - hostInterface, err := net.InterfaceByIndex(int(addr.ifa_ifindex_peer)) - if err == nil { - addNetwork.HostName = hostInterface.Name - } - } - - // Addresses - if addr.ifa_addr != nil && (addr.ifa_addr.sa_family == C.AF_INET || addr.ifa_addr.sa_family == C.AF_INET6) { - family := "inet" - if addr.ifa_addr.sa_family == C.AF_INET6 { - family = "inet6" - } - - addrPtr := C.get_addr_ptr(addr.ifa_addr) - if addrPtr == nil { - return nil, fmt.Errorf("Failed to retrieve valid address pointer") - } - - addressStr := C.inet_ntop(C.int(addr.ifa_addr.sa_family), addrPtr, &address[0], C.INET6_ADDRSTRLEN) - if addressStr == nil { - return nil, fmt.Errorf("Failed to retrieve address string") - } - - if addNetwork.Addresses == nil { - addNetwork.Addresses = []api.InstanceStateNetworkAddress{} - } - - goAddrString := C.GoString(addressStr) - scope := "global" - if strings.HasPrefix(goAddrString, "127") { - scope = "local" - } - - if goAddrString == "::1" { - scope = "local" - } - - if strings.HasPrefix(goAddrString, "169.254") { - scope = "link" - } - - if strings.HasPrefix(goAddrString, "fe80:") { - scope = "link" - } - - address := api.InstanceStateNetworkAddress{} - address.Family = family - address.Address = goAddrString - address.Netmask = fmt.Sprintf("%d", int(addr.ifa_prefixlen)) - address.Scope = scope - - addNetwork.Addresses = append(addNetwork.Addresses, address) - } else if addr.ifa_addr != nil && addr.ifa_addr.sa_family == C.AF_PACKET { - if (addr.ifa_flags & C.IFF_LOOPBACK) == 0 { - var buf [1024]C.char - - hwaddr := C.get_packet_address(addr.ifa_addr, &buf[0], 1024) - if hwaddr == nil { - return nil, fmt.Errorf("Failed to retrieve hardware address") - } - - addNetwork.Hwaddr = C.GoString(hwaddr) - } - } - - if addr.ifa_stats_type == C.IFLA_STATS64 { - addNetwork.Counters.BytesReceived = int64(addr.ifa_stats64.rx_bytes) - addNetwork.Counters.BytesSent = int64(addr.ifa_stats64.tx_bytes) - addNetwork.Counters.PacketsReceived = int64(addr.ifa_stats64.rx_packets) - addNetwork.Counters.PacketsSent = int64(addr.ifa_stats64.tx_packets) - } - ifName := C.GoString(addr.ifa_name) - - networks[ifName] = addNetwork - } - - return networks, nil -} - // WebsocketExecMirror mirrors a websocket connection with a set of Writer/Reader. func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser, exited chan struct{}, fd int) (chan bool, chan bool) { readDone := make(chan bool, 1) @@ -209,44 +48,3 @@ func WebsocketExecMirror(conn *websocket.Conn, w io.WriteCloser, r io.ReadCloser return readDone, writeDone } - -// AbstractUnixSendFd sends a Unix file descriptor over a Unix socket. -func AbstractUnixSendFd(sockFD int, sendFD int) error { - fd := C.int(sendFD) - skFd := C.int(sockFD) - ret := C.lxc_abstract_unix_send_fds(skFd, &fd, C.int(1), nil, C.size_t(0)) - if ret < 0 { - return fmt.Errorf("Failed to send file descriptor via abstract unix socket") - } - - return nil -} - -// AbstractUnixReceiveFd receives a Unix file descriptor from a Unix socket. -func AbstractUnixReceiveFd(sockFD int) (*os.File, error) { - fd := C.int(-1) - skFd := C.int(sockFD) - ret := C.lxc_abstract_unix_recv_fds(skFd, &fd, C.int(1), nil, C.size_t(0)) - if ret < 0 { - return nil, fmt.Errorf("Failed to receive file descriptor via abstract unix socket") - } - - file := os.NewFile(uintptr(fd), "") - return file, nil -} - -// AbstractUnixReceiveFdData is a low level function to receive a file descriptor over a unix socket. -func AbstractUnixReceiveFdData(sockFD int, numFds int, iov unsafe.Pointer, iovLen int32) (uint64, []C.int, error) { - cfd := make([]C.int, numFds) - skFd := C.int(sockFD) - ret, errno := C.lxc_abstract_unix_recv_fds_iov(skFd, (*C.int)(&cfd[0]), C.int(numFds), (*C.struct_iovec)(iov), C.size_t(iovLen)) - if ret < 0 { - return 0, []C.int{-C.EBADF}, fmt.Errorf("Failed to receive file descriptor via abstract unix socket: errno=%d", errno) - } - - if ret == 0 { - return 0, []C.int{-C.EBADF}, io.EOF - } - - return uint64(ret), cfd, nil -} diff --git a/shared/netutils/network_linux_cgo.go b/shared/netutils/network_linux_cgo.go new file mode 100644 index 0000000000..e161f37de3 --- /dev/null +++ b/shared/netutils/network_linux_cgo.go @@ -0,0 +1,210 @@ +// +build linux +// +build cgo + +package netutils + +import ( + "fmt" + "io" + "net" + "os" + "strings" + "unsafe" + + "github.com/lxc/lxd/shared/api" +) + +/* +#include "unixfd.h" +#include "netns_getifaddrs.c" +*/ +import "C" + +// NetnsGetifaddrs returns a map of InstanceStateNetwork for a particular process. +func NetnsGetifaddrs(initPID int32) (map[string]api.InstanceStateNetwork, error) { + var netnsidAware C.bool + var ifaddrs *C.struct_netns_ifaddrs + var netnsID C.__s32 + + if initPID > 0 { + f, err := os.Open(fmt.Sprintf("/proc/%d/ns/net", initPID)) + if err != nil { + return nil, err + } + defer f.Close() + + netnsID = C.netns_get_nsid(C.__s32(f.Fd())) + if netnsID < 0 { + return nil, fmt.Errorf("Failed to retrieve network namespace id") + } + } else { + netnsID = -1 + } + + ret := C.netns_getifaddrs(&ifaddrs, netnsID, &netnsidAware) + if ret < 0 { + return nil, fmt.Errorf("Failed to retrieve network interfaces and addresses") + } + defer C.netns_freeifaddrs(ifaddrs) + + if netnsID >= 0 && !netnsidAware { + return nil, fmt.Errorf("Netlink requests are not fully network namespace id aware") + } + + // We're using the interface name as key here but we should really + // switch to the ifindex at some point to handle ip aliasing correctly. + networks := map[string]api.InstanceStateNetwork{} + + for addr := ifaddrs; addr != nil; addr = addr.ifa_next { + var address [C.INET6_ADDRSTRLEN]C.char + addNetwork, networkExists := networks[C.GoString(addr.ifa_name)] + if !networkExists { + addNetwork = api.InstanceStateNetwork{ + Addresses: []api.InstanceStateNetworkAddress{}, + Counters: api.InstanceStateNetworkCounters{}, + } + } + + // Interface flags + netState := "down" + netType := "unknown" + + if (addr.ifa_flags & C.IFF_BROADCAST) > 0 { + netType = "broadcast" + } + + if (addr.ifa_flags & C.IFF_LOOPBACK) > 0 { + netType = "loopback" + } + + if (addr.ifa_flags & C.IFF_POINTOPOINT) > 0 { + netType = "point-to-point" + } + + if (addr.ifa_flags & C.IFF_UP) > 0 { + netState = "up" + } + addNetwork.State = netState + addNetwork.Type = netType + addNetwork.Mtu = int(addr.ifa_mtu) + + if initPID != 0 && int(addr.ifa_ifindex_peer) > 0 { + hostInterface, err := net.InterfaceByIndex(int(addr.ifa_ifindex_peer)) + if err == nil { + addNetwork.HostName = hostInterface.Name + } + } + + // Addresses + if addr.ifa_addr != nil && (addr.ifa_addr.sa_family == C.AF_INET || addr.ifa_addr.sa_family == C.AF_INET6) { + family := "inet" + if addr.ifa_addr.sa_family == C.AF_INET6 { + family = "inet6" + } + + addrPtr := C.get_addr_ptr(addr.ifa_addr) + if addrPtr == nil { + return nil, fmt.Errorf("Failed to retrieve valid address pointer") + } + + addressStr := C.inet_ntop(C.int(addr.ifa_addr.sa_family), addrPtr, &address[0], C.INET6_ADDRSTRLEN) + if addressStr == nil { + return nil, fmt.Errorf("Failed to retrieve address string") + } + + if addNetwork.Addresses == nil { + addNetwork.Addresses = []api.InstanceStateNetworkAddress{} + } + + goAddrString := C.GoString(addressStr) + scope := "global" + if strings.HasPrefix(goAddrString, "127") { + scope = "local" + } + + if goAddrString == "::1" { + scope = "local" + } + + if strings.HasPrefix(goAddrString, "169.254") { + scope = "link" + } + + if strings.HasPrefix(goAddrString, "fe80:") { + scope = "link" + } + + address := api.InstanceStateNetworkAddress{} + address.Family = family + address.Address = goAddrString + address.Netmask = fmt.Sprintf("%d", int(addr.ifa_prefixlen)) + address.Scope = scope + + addNetwork.Addresses = append(addNetwork.Addresses, address) + } else if addr.ifa_addr != nil && addr.ifa_addr.sa_family == C.AF_PACKET { + if (addr.ifa_flags & C.IFF_LOOPBACK) == 0 { + var buf [1024]C.char + + hwaddr := C.get_packet_address(addr.ifa_addr, &buf[0], 1024) + if hwaddr == nil { + return nil, fmt.Errorf("Failed to retrieve hardware address") + } + + addNetwork.Hwaddr = C.GoString(hwaddr) + } + } + + if addr.ifa_stats_type == C.IFLA_STATS64 { + addNetwork.Counters.BytesReceived = int64(addr.ifa_stats64.rx_bytes) + addNetwork.Counters.BytesSent = int64(addr.ifa_stats64.tx_bytes) + addNetwork.Counters.PacketsReceived = int64(addr.ifa_stats64.rx_packets) + addNetwork.Counters.PacketsSent = int64(addr.ifa_stats64.tx_packets) + } + ifName := C.GoString(addr.ifa_name) + + networks[ifName] = addNetwork + } + + return networks, nil +} + +// AbstractUnixSendFd sends a Unix file descriptor over a Unix socket. +func AbstractUnixSendFd(sockFD int, sendFD int) error { + fd := C.int(sendFD) + skFd := C.int(sockFD) + ret := C.lxc_abstract_unix_send_fds(skFd, &fd, C.int(1), nil, C.size_t(0)) + if ret < 0 { + return fmt.Errorf("Failed to send file descriptor via abstract unix socket") + } + + return nil +} + +// AbstractUnixReceiveFd receives a Unix file descriptor from a Unix socket. +func AbstractUnixReceiveFd(sockFD int) (*os.File, error) { + fd := C.int(-1) + skFd := C.int(sockFD) + ret := C.lxc_abstract_unix_recv_fds(skFd, &fd, C.int(1), nil, C.size_t(0)) + if ret < 0 { + return nil, fmt.Errorf("Failed to receive file descriptor via abstract unix socket") + } + + file := os.NewFile(uintptr(fd), "") + return file, nil +} + +// AbstractUnixReceiveFdData is a low level function to receive a file descriptor over a unix socket. +func AbstractUnixReceiveFdData(sockFD int, numFds int, iov unsafe.Pointer, iovLen int32) (uint64, []C.int, error) { + cfd := make([]C.int, numFds) + skFd := C.int(sockFD) + ret, errno := C.lxc_abstract_unix_recv_fds_iov(skFd, (*C.int)(&cfd[0]), C.int(numFds), (*C.struct_iovec)(iov), C.size_t(iovLen)) + if ret < 0 { + return 0, []C.int{-C.EBADF}, fmt.Errorf("Failed to receive file descriptor via abstract unix socket: errno=%d", errno) + } + + if ret == 0 { + return 0, []C.int{-C.EBADF}, io.EOF + } + + return uint64(ret), cfd, nil +} diff --git a/shared/util_linux.go b/shared/util_linux.go index 47d35a0e30..df7a527c3e 100644 --- a/shared/util_linux.go +++ b/shared/util_linux.go @@ -5,15 +5,19 @@ package shared import ( "bufio" "fmt" + "io" "os" "path/filepath" "reflect" "strings" + "sync" + "sync/atomic" "syscall" "unsafe" "golang.org/x/sys/unix" + "github.com/lxc/lxd/shared/logger" "github.com/lxc/lxd/shared/units" ) @@ -493,3 +497,207 @@ func OpenPty(uid, gid int64) (*os.File, *os.File, error) { revert = false return master, slave, nil } + +// Extensively commented directly in the code. Please leave the comments! +// Looking at this in a couple of months noone will know why and how this works +// anymore. +func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan struct{}, fd int) <-chan []byte { + if bufferSize <= (128 * 1024) { + bufferSize = (128 * 1024) + } + + ch := make(chan ([]byte)) + + // Takes care that the closeChannel() function is exactly executed once. + // This allows us to avoid using a mutex. + var once sync.Once + closeChannel := func() { + close(ch) + } + + // [1]: This function has just one job: Dealing with the case where we + // are running an interactive shell session where we put a process in + // the background that does hold stdin/stdout open, but does not + // generate any output at all. This case cannot be dealt with in the + // following function call. Here's why: Assume the above case, now the + // attached child (the shell in this example) exits. This will not + // generate any poll() event: We won't get POLLHUP because the + // background process is holding stdin/stdout open and noone is writing + // to it. So we effectively block on GetPollRevents() in the function + // below. Hence, we use another go routine here who's only job is to + // handle that case: When we detect that the child has exited we check + // whether a POLLIN or POLLHUP event has been generated. If not, we know + // that there's nothing buffered on stdout and exit. + var attachedChildIsDead int32 = 0 + go func() { + <-exited + + atomic.StoreInt32(&attachedChildIsDead, 1) + + ret, revents, err := GetPollRevents(fd, 0, (unix.POLLIN | unix.POLLPRI | unix.POLLERR | unix.POLLHUP | unix.POLLRDHUP | unix.POLLNVAL)) + if ret < 0 { + logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) on file descriptor: %s.", err) + // Something went wrong so let's exited otherwise we + // end up in an endless loop. + once.Do(closeChannel) + } else if ret > 0 { + if (revents & unix.POLLERR) > 0 { + logger.Warnf("Detected poll(POLLERR) event.") + // Read end has likely been closed so again, + // avoid an endless loop. + once.Do(closeChannel) + } else if (revents & unix.POLLNVAL) > 0 { + logger.Warnf("Detected poll(POLLNVAL) event.") + // Well, someone closed the fd havent they? So + // let's go home. + once.Do(closeChannel) + } + } else if ret == 0 { + logger.Debugf("No data in stdout: exiting.") + once.Do(closeChannel) + } + }() + + go func() { + readSize := (128 * 1024) + offset := 0 + buf := make([]byte, bufferSize) + avoidAtomicLoad := false + + defer once.Do(closeChannel) + for { + nr := 0 + var err error + + ret, revents, err := GetPollRevents(fd, -1, (unix.POLLIN | unix.POLLPRI | unix.POLLERR | unix.POLLHUP | unix.POLLRDHUP | unix.POLLNVAL)) + if ret < 0 { + // This condition is only reached in cases where we are massively f*cked since we even handle + // EINTR in the underlying C wrapper around poll(). So let's exit here. + logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err) + return + } + + // [2]: If the process exits before all its data has been read by us and no other process holds stdin or + // stdout open, then we will observe a (POLLHUP | POLLRDHUP | POLLIN) event. This means, we need to + // keep on reading from the pty file descriptor until we get a simple POLLHUP back. + both := ((revents & (unix.POLLIN | unix.POLLPRI)) > 0) && ((revents & (unix.POLLHUP | unix.POLLRDHUP)) > 0) + if both { + logger.Debugf("Detected poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event.") + read := buf[offset : offset+readSize] + nr, err = r.Read(read) + } + + if (revents & unix.POLLERR) > 0 { + logger.Warnf("Detected poll(POLLERR) event: exiting.") + return + } else if (revents & unix.POLLNVAL) > 0 { + logger.Warnf("Detected poll(POLLNVAL) event: exiting.") + return + } + + if ((revents & (unix.POLLIN | unix.POLLPRI)) > 0) && !both { + // This might appear unintuitive at first but is actually a nice trick: Assume we are running + // a shell session in a container and put a process in the background that is writing to + // stdout. Now assume the attached process (aka the shell in this example) exits because we + // used Ctrl+D to send EOF or something. If no other process would be holding stdout open we + // would expect to observe either a (POLLHUP | POLLRDHUP | POLLIN | POLLPRI) event if there + // is still data buffered from the previous process or a simple (POLLHUP | POLLRDHUP) if + // no data is buffered. The fact that we only observe a (POLLIN | POLLPRI) event means that + // another process is holding stdout open and is writing to it. + // One counter argument that can be leveraged is (brauner looks at tycho :)) + // "Hey, you need to write at least one additional tty buffer to make sure that + // everything that the attached child has written is actually shown." + // The answer to that is: + // "This case can only happen if the process has exited and has left data in stdout which + // would generate a (POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event and this case is already + // handled and triggers another codepath. (See [2].)" + if avoidAtomicLoad || atomic.LoadInt32(&attachedChildIsDead) == 1 { + avoidAtomicLoad = true + // Handle race between atomic.StorInt32() in the go routine + // explained in [1] and atomic.LoadInt32() in the go routine + // here: + // We need to check for (POLLHUP | POLLRDHUP) here again since we might + // still be handling a pure POLLIN event from a write prior to the childs + // exit. But the child might have exited right before and performed + // atomic.StoreInt32() to update attachedChildIsDead before we + // performed our atomic.LoadInt32(). This means we accidentally hit this + // codepath and are misinformed about the available poll() events. So we + // need to perform a non-blocking poll() again to exclude that case: + // + // - If we detect no (POLLHUP | POLLRDHUP) event we know the child + // has already exited but someone else is holding stdin/stdout open and + // writing to it. + // Note that his case should only ever be triggered in situations like + // running a shell and doing stuff like: + // > ./lxc exec xen1 -- bash + // root@xen1:~# yes & + // . + // . + // . + // now send Ctrl+D or type "exit". By the time the Ctrl+D/exit event is + // triggered, we will have read all of the childs data it has written to + // stdout and so we can assume that anything that comes now belongs to + // the process that is holding stdin/stdout open. + // + // - If we detect a (POLLHUP | POLLRDHUP) event we know that we've + // hit this codepath on accident caused by the race between + // atomic.StoreInt32() in the go routine explained in [1] and + // atomic.LoadInt32() in this go routine. So the next call to + // GetPollRevents() will either return + // (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) + // or (POLLHUP | POLLRDHUP). Both will trigger another codepath (See [2].) + // that takes care that all data of the child that is buffered in + // stdout is written out. + ret, revents, err := GetPollRevents(fd, 0, (unix.POLLIN | unix.POLLPRI | unix.POLLERR | unix.POLLHUP | unix.POLLRDHUP | unix.POLLNVAL)) + if ret < 0 { + logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err) + return + } else if (revents & (unix.POLLHUP | unix.POLLRDHUP | unix.POLLERR | unix.POLLNVAL)) == 0 { + logger.Debugf("Exiting but background processes are still running.") + return + } + } + read := buf[offset : offset+readSize] + nr, err = r.Read(read) + } + + // The attached process has exited and we have read all data that may have + // been buffered. + if ((revents & (unix.POLLHUP | unix.POLLRDHUP)) > 0) && !both { + logger.Debugf("Detected poll(POLLHUP) event: exiting.") + return + } + + offset += nr + if offset > 0 && (offset+readSize >= bufferSize || err != nil) { + ch <- buf[0:offset] + offset = 0 + buf = make([]byte, bufferSize) + } + } + }() + + return ch +} + +// GetPollRevents poll for events on provided fd. +func GetPollRevents(fd int, timeout int, flags int) (int, int, error) { + pollFd := unix.PollFd{ + Fd: int32(fd), + Events: int16(flags), + Revents: 0, + } + pollFds := []unix.PollFd{pollFd} + + again: + n, err := unix.Poll(pollFds, timeout) + if err != nil { + if err == syscall.EAGAIN || err == syscall.EINTR { + goto again + } + + return -1, -1, err + } + + return n, int(pollFds[0].Revents), err +} diff --git a/shared/util_linux_cgo.go b/shared/util_linux_cgo.go index e986dce8ee..d5e85c3165 100644 --- a/shared/util_linux_cgo.go +++ b/shared/util_linux_cgo.go @@ -5,15 +5,10 @@ package shared import ( "fmt" - "io" "os" - "sync" - "sync/atomic" "unsafe" "golang.org/x/sys/unix" - - "github.com/lxc/lxd/shared/logger" ) /* @@ -38,30 +33,6 @@ import ( #define ABSTRACT_UNIX_SOCK_LEN sizeof(((struct sockaddr_un *)0)->sun_path) -// This is an adaption from https://codereview.appspot.com/4589049, to be -// included in the stdlib with the stdlib's license. - -int get_poll_revents(int lfd, int timeout, int flags, int *revents, int *saved_errno) -{ - int ret; - struct pollfd pfd = {lfd, flags, 0}; - -again: - ret = poll(&pfd, 1, timeout); - if (ret < 0) { - if (errno == EINTR || errno == EAGAIN) - goto again; - - *saved_errno = errno; - fprintf(stderr, "Failed to poll() on file descriptor.\n"); - return -1; - } - - *revents = pfd.revents; - - return ret; -} - static int read_pid(int fd) { ssize_t ret; @@ -82,26 +53,6 @@ import "C" const ABSTRACT_UNIX_SOCK_LEN int = C.ABSTRACT_UNIX_SOCK_LEN -const POLLIN int = C.POLLIN -const POLLPRI int = C.POLLPRI -const POLLNVAL int = C.POLLNVAL -const POLLERR int = C.POLLERR -const POLLHUP int = C.POLLHUP -const POLLRDHUP int = C.POLLRDHUP - -func GetPollRevents(fd int, timeout int, flags int) (int, int, error) { - var err error - revents := C.int(0) - saved_errno := C.int(0) - - ret := C.get_poll_revents(C.int(fd), C.int(timeout), C.int(flags), &revents, &saved_errno) - if int(ret) < 0 { - err = unix.Errno(saved_errno) - } - - return int(ret), int(revents), err -} - // UserId is an adaption from https://codereview.appspot.com/4589049. func UserId(name string) (int, error) { var pw C.struct_passwd @@ -198,188 +149,6 @@ again: return int(C.int(result.gr_gid)), nil } -// Extensively commented directly in the code. Please leave the comments! -// Looking at this in a couple of months noone will know why and how this works -// anymore. -func ExecReaderToChannel(r io.Reader, bufferSize int, exited <-chan struct{}, fd int) <-chan []byte { - if bufferSize <= (128 * 1024) { - bufferSize = (128 * 1024) - } - - ch := make(chan ([]byte)) - - // Takes care that the closeChannel() function is exactly executed once. - // This allows us to avoid using a mutex. - var once sync.Once - closeChannel := func() { - close(ch) - } - - // [1]: This function has just one job: Dealing with the case where we - // are running an interactive shell session where we put a process in - // the background that does hold stdin/stdout open, but does not - // generate any output at all. This case cannot be dealt with in the - // following function call. Here's why: Assume the above case, now the - // attached child (the shell in this example) exits. This will not - // generate any poll() event: We won't get POLLHUP because the - // background process is holding stdin/stdout open and noone is writing - // to it. So we effectively block on GetPollRevents() in the function - // below. Hence, we use another go routine here who's only job is to - // handle that case: When we detect that the child has exited we check - // whether a POLLIN or POLLHUP event has been generated. If not, we know - // that there's nothing buffered on stdout and exit. - var attachedChildIsDead int32 = 0 - go func() { - <-exited - - atomic.StoreInt32(&attachedChildIsDead, 1) - - ret, revents, err := GetPollRevents(fd, 0, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) - if ret < 0 { - logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) on file descriptor: %s.", err) - // Something went wrong so let's exited otherwise we - // end up in an endless loop. - once.Do(closeChannel) - } else if ret > 0 { - if (revents & POLLERR) > 0 { - logger.Warnf("Detected poll(POLLERR) event.") - // Read end has likely been closed so again, - // avoid an endless loop. - once.Do(closeChannel) - } else if (revents & POLLNVAL) > 0 { - logger.Warnf("Detected poll(POLLNVAL) event.") - // Well, someone closed the fd havent they? So - // let's go home. - once.Do(closeChannel) - } - } else if ret == 0 { - logger.Debugf("No data in stdout: exiting.") - once.Do(closeChannel) - } - }() - - go func() { - readSize := (128 * 1024) - offset := 0 - buf := make([]byte, bufferSize) - avoidAtomicLoad := false - - defer once.Do(closeChannel) - for { - nr := 0 - var err error - - ret, revents, err := GetPollRevents(fd, -1, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) - if ret < 0 { - // This condition is only reached in cases where we are massively f*cked since we even handle - // EINTR in the underlying C wrapper around poll(). So let's exit here. - logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err) - return - } - - // [2]: If the process exits before all its data has been read by us and no other process holds stdin or - // stdout open, then we will observe a (POLLHUP | POLLRDHUP | POLLIN) event. This means, we need to - // keep on reading from the pty file descriptor until we get a simple POLLHUP back. - both := ((revents & (POLLIN | POLLPRI)) > 0) && ((revents & (POLLHUP | POLLRDHUP)) > 0) - if both { - logger.Debugf("Detected poll(POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event.") - read := buf[offset : offset+readSize] - nr, err = r.Read(read) - } - - if (revents & POLLERR) > 0 { - logger.Warnf("Detected poll(POLLERR) event: exiting.") - return - } else if (revents & POLLNVAL) > 0 { - logger.Warnf("Detected poll(POLLNVAL) event: exiting.") - return - } - - if ((revents & (POLLIN | POLLPRI)) > 0) && !both { - // This might appear unintuitive at first but is actually a nice trick: Assume we are running - // a shell session in a container and put a process in the background that is writing to - // stdout. Now assume the attached process (aka the shell in this example) exits because we - // used Ctrl+D to send EOF or something. If no other process would be holding stdout open we - // would expect to observe either a (POLLHUP | POLLRDHUP | POLLIN | POLLPRI) event if there - // is still data buffered from the previous process or a simple (POLLHUP | POLLRDHUP) if - // no data is buffered. The fact that we only observe a (POLLIN | POLLPRI) event means that - // another process is holding stdout open and is writing to it. - // One counter argument that can be leveraged is (brauner looks at tycho :)) - // "Hey, you need to write at least one additional tty buffer to make sure that - // everything that the attached child has written is actually shown." - // The answer to that is: - // "This case can only happen if the process has exited and has left data in stdout which - // would generate a (POLLIN | POLLPRI | POLLHUP | POLLRDHUP) event and this case is already - // handled and triggers another codepath. (See [2].)" - if avoidAtomicLoad || atomic.LoadInt32(&attachedChildIsDead) == 1 { - avoidAtomicLoad = true - // Handle race between atomic.StorInt32() in the go routine - // explained in [1] and atomic.LoadInt32() in the go routine - // here: - // We need to check for (POLLHUP | POLLRDHUP) here again since we might - // still be handling a pure POLLIN event from a write prior to the childs - // exit. But the child might have exited right before and performed - // atomic.StoreInt32() to update attachedChildIsDead before we - // performed our atomic.LoadInt32(). This means we accidentally hit this - // codepath and are misinformed about the available poll() events. So we - // need to perform a non-blocking poll() again to exclude that case: - // - // - If we detect no (POLLHUP | POLLRDHUP) event we know the child - // has already exited but someone else is holding stdin/stdout open and - // writing to it. - // Note that his case should only ever be triggered in situations like - // running a shell and doing stuff like: - // > ./lxc exec xen1 -- bash - // root@xen1:~# yes & - // . - // . - // . - // now send Ctrl+D or type "exit". By the time the Ctrl+D/exit event is - // triggered, we will have read all of the childs data it has written to - // stdout and so we can assume that anything that comes now belongs to - // the process that is holding stdin/stdout open. - // - // - If we detect a (POLLHUP | POLLRDHUP) event we know that we've - // hit this codepath on accident caused by the race between - // atomic.StoreInt32() in the go routine explained in [1] and - // atomic.LoadInt32() in this go routine. So the next call to - // GetPollRevents() will either return - // (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) - // or (POLLHUP | POLLRDHUP). Both will trigger another codepath (See [2].) - // that takes care that all data of the child that is buffered in - // stdout is written out. - ret, revents, err := GetPollRevents(fd, 0, (POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP | POLLNVAL)) - if ret < 0 { - logger.Errorf("Failed to poll(POLLIN | POLLPRI | POLLERR | POLLHUP | POLLRDHUP) on file descriptor: %s. Exiting.", err) - return - } else if (revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) == 0 { - logger.Debugf("Exiting but background processes are still running.") - return - } - } - read := buf[offset : offset+readSize] - nr, err = r.Read(read) - } - - // The attached process has exited and we have read all data that may have - // been buffered. - if ((revents & (POLLHUP | POLLRDHUP)) > 0) && !both { - logger.Debugf("Detected poll(POLLHUP) event: exiting.") - return - } - - offset += nr - if offset > 0 && (offset+readSize >= bufferSize || err != nil) { - ch <- buf[0:offset] - offset = 0 - buf = make([]byte, bufferSize) - } - } - }() - - return ch -} - func ReadPid(r *os.File) int { return int(C.read_pid(C.int(r.Fd()))) } From 65c28c42f68f3ec0c03cc41f9defd1dc6e3716ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Graber?= <stgra...@ubuntu.com> Date: Tue, 19 May 2020 17:13:02 -0400 Subject: [PATCH 2/2] lxd-agent: Build statically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stéphane Graber <stgra...@ubuntu.com> --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 0e0948f59d..3fbd611381 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ endif go get -t -v -d ./... CC=$(CC) go install -v -tags "$(TAG_SQLITE3)" $(DEBUG) ./... CGO_ENABLED=0 go install -v -tags netgo ./lxd-p2c - go install -v -tags agent ./lxd-agent + CGO_ENABLED=0 go install -v -tags agent,netgo ./lxd-agent @echo "LXD built successfully" .PHONY: client @@ -30,7 +30,7 @@ client: .PHONY: lxd-agent lxd-agent: - go install -v -tags agent ./lxd-agent + CGO_ENABLED=0 go install -v -tags agent,netgo ./lxd-agent @echo "LXD agent built successfully" .PHONY: lxd-p2c
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel