From 94fb37f5573e1484ba686b195079684cace18eb0 Mon Sep 17 00:00:00 2001 From: Andrey Vagin Date: Mon, 23 Feb 2015 12:26:43 +0300 Subject: [PATCH] process: add Wait(), Signal() and Pid() methods Currently we have a problem when buffers are used for std file descriptors. These buffers are filled from goroutines (Cmd.goroutine), and we need to wait them to be sure that all data have been copied. Signed-off-by: Andrew Vagin --- container.go | 12 +----- container_linux.go | 20 +++------ error.go | 3 ++ integration/exec_test.go | 39 +++++++++++------ integration/execin_test.go | 88 +++++++++++++++++--------------------- integration/utils_test.go | 8 +--- nsinit/exec.go | 16 ++----- process.go | 38 +++++++++++++++- process_linux.go | 57 ++++++++++++++---------- 9 files changed, 153 insertions(+), 128 deletions(-) diff --git a/container.go b/container.go index 2d4cbcad..ef6f1dc7 100644 --- a/container.go +++ b/container.go @@ -5,8 +5,6 @@ package libcontainer import ( - "os" - "github.com/docker/libcontainer/configs" ) @@ -99,7 +97,7 @@ type Container interface { // ConfigInvalid - config is invalid, // ContainerPaused - Container is paused, // Systemerror - System error. - Start(process *Process) (pid int, err error) + Start(process *Process) (err error) // Destroys the container after killing all running processes. // @@ -129,14 +127,6 @@ type Container interface { // Systemerror - System error. Resume() error - // Signal sends the specified signal to the init process of the container. - // - // errors: - // ContainerDestroyed - Container no longer exists, - // ContainerPaused - Container is paused, - // Systemerror - System error. - Signal(signal os.Signal) error - // NotifyOOM returns a read-only channel signaling when the container receives an OOM notification. // // errors: diff --git a/container_linux.go b/container_linux.go index 52d9af87..ef479710 100644 --- a/container_linux.go +++ b/container_linux.go @@ -77,30 +77,31 @@ func (c *linuxContainer) Stats() (*Stats, error) { return stats, nil } -func (c *linuxContainer) Start(process *Process) (int, error) { +func (c *linuxContainer) Start(process *Process) error { c.m.Lock() defer c.m.Unlock() status, err := c.currentStatus() if err != nil { - return -1, err + return err } doInit := status == Destroyed parent, err := c.newParentProcess(process, doInit) if err != nil { - return -1, newSystemError(err) + return newSystemError(err) } if err := parent.start(); err != nil { // terminate the process to ensure that it properly is reaped. if err := parent.terminate(); err != nil { log.Warn(err) } - return -1, newSystemError(err) + return newSystemError(err) } + process.ops = parent if doInit { c.updateState(parent) } - return parent.pid(), nil + return nil } func (c *linuxContainer) newParentProcess(p *Process, doInit bool) (parentProcess, error) { @@ -227,15 +228,6 @@ func (c *linuxContainer) Resume() error { return c.cgroupManager.Freeze(configs.Thawed) } -func (c *linuxContainer) Signal(signal os.Signal) error { - c.m.Lock() - defer c.m.Unlock() - if c.initProcess == nil { - return newGenericError(nil, ContainerNotRunning) - } - return c.initProcess.signal(signal) -} - func (c *linuxContainer) NotifyOOM() (<-chan struct{}, error) { return notifyOnOOM(c.cgroupManager.GetPaths()) } diff --git a/error.go b/error.go index 37e99366..6c266620 100644 --- a/error.go +++ b/error.go @@ -17,6 +17,9 @@ const ( ContainerNotStopped ContainerNotRunning + // Process errors + ProcessNotExecuted + // Common errors ConfigInvalid SystemError diff --git a/integration/exec_test.go b/integration/exec_test.go index 1b04ff50..3d425e4b 100644 --- a/integration/exec_test.go +++ b/integration/exec_test.go @@ -204,11 +204,7 @@ func newTestRoot() (string, error) { return dir, nil } -func waitProcess(pid int, t *testing.T) { - p, err := os.FindProcess(pid) - if err != nil { - t.Fatal(err) - } +func waitProcess(p *libcontainer.Process, t *testing.T) { status, err := p.Wait() if err != nil { t.Fatal(err) @@ -261,29 +257,41 @@ func TestEnter(t *testing.T) { Stdin: stdinR, Stdout: &stdout, } - pid, err := container.Start(&pconfig) + err = container.Start(&pconfig) stdinR.Close() defer stdinW.Close() if err != nil { t.Fatal(err) } + pid, err := pconfig.Pid() + if err != nil { + t.Fatal(err) + } // Execute a first process in the container stdinR2, stdinW2, err := os.Pipe() if err != nil { t.Fatal(err) } - pconfig.Args = []string{"sh", "-c", "cat && readlink /proc/self/ns/pid"} - pconfig.Stdin = stdinR2 - pconfig.Stdout = &stdout2 + pconfig2 := libcontainer.Process{ + Env: standardEnvironment, + } + pconfig2.Args = []string{"sh", "-c", "cat && readlink /proc/self/ns/pid"} + pconfig2.Stdin = stdinR2 + pconfig2.Stdout = &stdout2 - pid2, err := container.Start(&pconfig) + err = container.Start(&pconfig2) stdinR2.Close() defer stdinW2.Close() if err != nil { t.Fatal(err) } + pid2, err := pconfig2.Pid() + if err != nil { + t.Fatal(err) + } + processes, err := container.Processes() if err != nil { t.Fatal(err) @@ -301,10 +309,10 @@ func TestEnter(t *testing.T) { // Wait processes stdinW2.Close() - waitProcess(pid2, t) + waitProcess(&pconfig2, t) stdinW.Close() - waitProcess(pid, t) + waitProcess(&pconfig, t) // Check that both processes live in the same pidns pidns := string(stdout.Bytes()) @@ -361,13 +369,18 @@ func TestFreeze(t *testing.T) { Env: standardEnvironment, Stdin: stdinR, } - pid, err := container.Start(&pconfig) + err = container.Start(&pconfig) stdinR.Close() defer stdinW.Close() if err != nil { t.Fatal(err) } + pid, err := pconfig.Pid() + if err != nil { + t.Fatal(err) + } + process, err := os.FindProcess(pid) if err != nil { t.Fatal(err) diff --git a/integration/execin_test.go b/integration/execin_test.go index 3f015d34..3a1c5387 100644 --- a/integration/execin_test.go +++ b/integration/execin_test.go @@ -3,7 +3,6 @@ package integration import ( "os" "strings" - "syscall" "testing" "github.com/docker/libcontainer" @@ -24,48 +23,45 @@ func TestExecIn(t *testing.T) { t.Fatal(err) } defer container.Destroy() - buffers := newStdBuffers() - process := &libcontainer.Process{ - Args: []string{"sleep", "10"}, - Env: standardEnvironment, - Stdin: buffers.Stdin, - Stdout: buffers.Stdout, - Stderr: buffers.Stderr, - } - pid1, err := container.Start(process) + + // Execute a first process in the container + stdinR, stdinW, err := os.Pipe() if err != nil { t.Fatal(err) } - buffers = newStdBuffers() - psPid, err := container.Start(&libcontainer.Process{ + process := &libcontainer.Process{ + Args: []string{"cat"}, + Env: standardEnvironment, + Stdin: stdinR, + } + err = container.Start(process) + stdinR.Close() + defer stdinW.Close() + if err != nil { + t.Fatal(err) + } + + buffers := newStdBuffers() + ps := &libcontainer.Process{ Args: []string{"ps"}, Env: standardEnvironment, Stdin: buffers.Stdin, Stdout: buffers.Stdout, Stderr: buffers.Stderr, - }) - if err != nil { - t.Fatal(err) } - ps, err := os.FindProcess(psPid) + err = container.Start(ps) if err != nil { t.Fatal(err) } if _, err := ps.Wait(); err != nil { t.Fatal(err) } - p, err := os.FindProcess(pid1) - if err != nil { - t.Fatal(err) - } - if err := p.Signal(syscall.SIGKILL); err != nil { - t.Log(err) - } - if _, err := p.Wait(); err != nil { + stdinW.Close() + if _, err := process.Wait(); err != nil { t.Log(err) } out := buffers.Stdout.String() - if !strings.Contains(out, "sleep 10") || !strings.Contains(out, "ps") { + if !strings.Contains(out, "cat") || !strings.Contains(out, "ps") { t.Fatalf("unexpected running process, output %q", out) } } @@ -85,44 +81,40 @@ func TestExecInRlimit(t *testing.T) { t.Fatal(err) } defer container.Destroy() - buffers := newStdBuffers() - process := &libcontainer.Process{ - Args: []string{"sleep", "10"}, - Env: standardEnvironment, - Stdin: buffers.Stdin, - Stdout: buffers.Stdout, - Stderr: buffers.Stderr, - } - pid1, err := container.Start(process) + + stdinR, stdinW, err := os.Pipe() if err != nil { t.Fatal(err) } - buffers = newStdBuffers() - psPid, err := container.Start(&libcontainer.Process{ + process := &libcontainer.Process{ + Args: []string{"cat"}, + Env: standardEnvironment, + Stdin: stdinR, + } + err = container.Start(process) + stdinR.Close() + defer stdinW.Close() + if err != nil { + t.Fatal(err) + } + + buffers := newStdBuffers() + ps := &libcontainer.Process{ Args: []string{"/bin/sh", "-c", "ulimit -n"}, Env: standardEnvironment, Stdin: buffers.Stdin, Stdout: buffers.Stdout, Stderr: buffers.Stderr, - }) - if err != nil { - t.Fatal(err) } - ps, err := os.FindProcess(psPid) + err = container.Start(ps) if err != nil { t.Fatal(err) } if _, err := ps.Wait(); err != nil { t.Fatal(err) } - p, err := os.FindProcess(pid1) - if err != nil { - t.Fatal(err) - } - if err := p.Signal(syscall.SIGKILL); err != nil { - t.Log(err) - } - if _, err := p.Wait(); err != nil { + stdinW.Close() + if _, err := process.Wait(); err != nil { t.Log(err) } out := buffers.Stdout.String() diff --git a/integration/utils_test.go b/integration/utils_test.go index d31bd480..c444eecf 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -97,15 +97,11 @@ func runContainer(config *configs.Config, console string, args ...string) (buffe Stderr: buffers.Stderr, } - pid, err := container.Start(process) + err = container.Start(process) if err != nil { return nil, -1, err } - p, err := os.FindProcess(pid) - if err != nil { - return nil, -1, err - } - ps, err := p.Wait() + ps, err := process.Wait() if err != nil { return nil, -1, err } diff --git a/nsinit/exec.go b/nsinit/exec.go index 52f25144..3036cd46 100644 --- a/nsinit/exec.go +++ b/nsinit/exec.go @@ -60,7 +60,6 @@ func execAction(context *cli.Context) { fatal(err) } } - go handleSignals(container, tty) process := &libcontainer.Process{ Args: context.Args(), Env: context.StringSlice("env"), @@ -73,7 +72,8 @@ func execAction(context *cli.Context) { if err := tty.attach(process); err != nil { fatal(err) } - pid, err := container.Start(process) + go handleSignals(process, tty) + err = container.Start(process) if err != nil { tty.Close() if created { @@ -81,15 +81,7 @@ func execAction(context *cli.Context) { } fatal(err) } - proc, err := os.FindProcess(pid) - if err != nil { - tty.Close() - if created { - container.Destroy() - } - fatal(err) - } - status, err := proc.Wait() + status, err := process.Wait() if err != nil { tty.Close() if created { @@ -107,7 +99,7 @@ func execAction(context *cli.Context) { os.Exit(utils.ExitStatus(status.Sys().(syscall.WaitStatus))) } -func handleSignals(container libcontainer.Container, tty *tty) { +func handleSignals(container *libcontainer.Process, tty *tty) { sigc := make(chan os.Signal, 10) signal.Notify(sigc) tty.resize() diff --git a/process.go b/process.go index d361f98f..94648925 100644 --- a/process.go +++ b/process.go @@ -1,6 +1,15 @@ package libcontainer -import "io" +import ( + "io" + "os" +) + +type processOperations interface { + wait() (*os.ProcessState, error) + signal(sig os.Signal) error + pid() int +} // Process specifies the configuration and IO for a process inside // a container. @@ -26,4 +35,31 @@ type Process struct { // Stderr is a pointer to a writer which receives the standard error stream. Stderr io.Writer + + ops processOperations +} + +// Wait waits for the process to exit. +// Wait releases any resources associated with the Process +func (p Process) Wait() (*os.ProcessState, error) { + if p.ops == nil { + return nil, newGenericError(nil, ProcessNotExecuted) + } + return p.ops.wait() +} + +// Pid returns the process ID +func (p Process) Pid() (int, error) { + if p.ops == nil { + return -1, newGenericError(nil, ProcessNotExecuted) + } + return p.ops.pid(), nil +} + +// Signal sends a signal to the Process. +func (p Process) Signal(sig os.Signal) error { + if p.ops == nil { + return newGenericError(nil, ProcessNotExecuted) + } + return p.ops.signal(sig) } diff --git a/process_linux.go b/process_linux.go index 0fd33b92..ec26eee2 100644 --- a/process_linux.go +++ b/process_linux.go @@ -33,12 +33,11 @@ type parentProcess interface { } type setnsProcess struct { - cmd *exec.Cmd - parentPipe *os.File - childPipe *os.File - forkedProcess *os.Process - cgroupPaths map[string]string - config *initConfig + cmd *exec.Cmd + parentPipe *os.File + childPipe *os.File + cgroupPaths map[string]string + config *initConfig } func (p *setnsProcess) startTime() (string, error) { @@ -46,16 +45,16 @@ func (p *setnsProcess) startTime() (string, error) { } func (p *setnsProcess) signal(s os.Signal) error { - return p.forkedProcess.Signal(s) + return p.cmd.Process.Signal(s) } func (p *setnsProcess) start() (err error) { defer p.parentPipe.Close() - if p.forkedProcess, err = p.execSetns(); err != nil { + if err = p.execSetns(); err != nil { return newSystemError(err) } if len(p.cgroupPaths) > 0 { - if err := cgroups.EnterPid(p.cgroupPaths, p.forkedProcess.Pid); err != nil { + if err := cgroups.EnterPid(p.cgroupPaths, p.cmd.Process.Pid); err != nil { return newSystemError(err) } } @@ -69,33 +68,40 @@ func (p *setnsProcess) start() (err error) { // because setns support requires the C process to fork off a child and perform the setns // before the go runtime boots, we wait on the process to die and receive the child's pid // over the provided pipe. -func (p *setnsProcess) execSetns() (*os.Process, error) { +func (p *setnsProcess) execSetns() error { err := p.cmd.Start() p.childPipe.Close() if err != nil { - return nil, newSystemError(err) + return newSystemError(err) } status, err := p.cmd.Process.Wait() if err != nil { - return nil, newSystemError(err) + p.cmd.Wait() + return newSystemError(err) } if !status.Success() { - return nil, newSystemError(&exec.ExitError{ProcessState: status}) + p.cmd.Wait() + return newSystemError(&exec.ExitError{ProcessState: status}) } var pid *pid if err := json.NewDecoder(p.parentPipe).Decode(&pid); err != nil { - return nil, newSystemError(err) + p.cmd.Wait() + return newSystemError(err) } - return os.FindProcess(pid.Pid) + + process, err := os.FindProcess(pid.Pid) + if err != nil { + return err + } + + p.cmd.Process = process + return nil } // terminate sends a SIGKILL to the forked process for the setns routine then waits to // avoid the process becomming a zombie. func (p *setnsProcess) terminate() error { - if p.forkedProcess == nil { - return nil - } - err := p.forkedProcess.Kill() + err := p.cmd.Process.Kill() if _, werr := p.wait(); err == nil { err = werr } @@ -103,11 +109,16 @@ func (p *setnsProcess) terminate() error { } func (p *setnsProcess) wait() (*os.ProcessState, error) { - return p.forkedProcess.Wait() + err := p.cmd.Wait() + if err != nil { + return nil, err + } + + return p.cmd.ProcessState, nil } func (p *setnsProcess) pid() int { - return p.forkedProcess.Pid + return p.cmd.Process.Pid } type initProcess struct { @@ -159,7 +170,7 @@ func (p *initProcess) start() error { } func (p *initProcess) wait() (*os.ProcessState, error) { - state, err := p.cmd.Process.Wait() + err := p.cmd.Wait() if err != nil { return nil, err } @@ -167,7 +178,7 @@ func (p *initProcess) wait() (*os.ProcessState, error) { if p.cmd.SysProcAttr.Cloneflags&syscall.CLONE_NEWPID == 0 { killCgroupProcesses(p.manager) } - return state, nil + return p.cmd.ProcessState, nil } func (p *initProcess) terminate() error {