process: add Wait(), Signal() and Pid() methods

Currently we have a problem when buffers are used for std file
descriptors.  These buffers are filled from goroutines (Cmd.goroutine),
and we need to wait them to be sure that all data have been copied.

Signed-off-by: Andrew Vagin <avagin@openvz.org>
This commit is contained in:
Andrey Vagin 2015-02-23 12:26:43 +03:00
parent d1ae7cd673
commit 94fb37f557
9 changed files with 153 additions and 128 deletions

View File

@ -5,8 +5,6 @@
package libcontainer
import (
"os"
"github.com/docker/libcontainer/configs"
)
@ -99,7 +97,7 @@ type Container interface {
// ConfigInvalid - config is invalid,
// ContainerPaused - Container is paused,
// Systemerror - System error.
Start(process *Process) (pid int, err error)
Start(process *Process) (err error)
// Destroys the container after killing all running processes.
//
@ -129,14 +127,6 @@ type Container interface {
// Systemerror - System error.
Resume() error
// Signal sends the specified signal to the init process of the container.
//
// errors:
// ContainerDestroyed - Container no longer exists,
// ContainerPaused - Container is paused,
// Systemerror - System error.
Signal(signal os.Signal) error
// NotifyOOM returns a read-only channel signaling when the container receives an OOM notification.
//
// errors:

View File

@ -77,30 +77,31 @@ func (c *linuxContainer) Stats() (*Stats, error) {
return stats, nil
}
func (c *linuxContainer) Start(process *Process) (int, error) {
func (c *linuxContainer) Start(process *Process) error {
c.m.Lock()
defer c.m.Unlock()
status, err := c.currentStatus()
if err != nil {
return -1, err
return err
}
doInit := status == Destroyed
parent, err := c.newParentProcess(process, doInit)
if err != nil {
return -1, newSystemError(err)
return newSystemError(err)
}
if err := parent.start(); err != nil {
// terminate the process to ensure that it properly is reaped.
if err := parent.terminate(); err != nil {
log.Warn(err)
}
return -1, newSystemError(err)
return newSystemError(err)
}
process.ops = parent
if doInit {
c.updateState(parent)
}
return parent.pid(), nil
return nil
}
func (c *linuxContainer) newParentProcess(p *Process, doInit bool) (parentProcess, error) {
@ -227,15 +228,6 @@ func (c *linuxContainer) Resume() error {
return c.cgroupManager.Freeze(configs.Thawed)
}
func (c *linuxContainer) Signal(signal os.Signal) error {
c.m.Lock()
defer c.m.Unlock()
if c.initProcess == nil {
return newGenericError(nil, ContainerNotRunning)
}
return c.initProcess.signal(signal)
}
func (c *linuxContainer) NotifyOOM() (<-chan struct{}, error) {
return notifyOnOOM(c.cgroupManager.GetPaths())
}

View File

@ -17,6 +17,9 @@ const (
ContainerNotStopped
ContainerNotRunning
// Process errors
ProcessNotExecuted
// Common errors
ConfigInvalid
SystemError

View File

@ -204,11 +204,7 @@ func newTestRoot() (string, error) {
return dir, nil
}
func waitProcess(pid int, t *testing.T) {
p, err := os.FindProcess(pid)
if err != nil {
t.Fatal(err)
}
func waitProcess(p *libcontainer.Process, t *testing.T) {
status, err := p.Wait()
if err != nil {
t.Fatal(err)
@ -261,29 +257,41 @@ func TestEnter(t *testing.T) {
Stdin: stdinR,
Stdout: &stdout,
}
pid, err := container.Start(&pconfig)
err = container.Start(&pconfig)
stdinR.Close()
defer stdinW.Close()
if err != nil {
t.Fatal(err)
}
pid, err := pconfig.Pid()
if err != nil {
t.Fatal(err)
}
// Execute a first process in the container
stdinR2, stdinW2, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
pconfig.Args = []string{"sh", "-c", "cat && readlink /proc/self/ns/pid"}
pconfig.Stdin = stdinR2
pconfig.Stdout = &stdout2
pconfig2 := libcontainer.Process{
Env: standardEnvironment,
}
pconfig2.Args = []string{"sh", "-c", "cat && readlink /proc/self/ns/pid"}
pconfig2.Stdin = stdinR2
pconfig2.Stdout = &stdout2
pid2, err := container.Start(&pconfig)
err = container.Start(&pconfig2)
stdinR2.Close()
defer stdinW2.Close()
if err != nil {
t.Fatal(err)
}
pid2, err := pconfig2.Pid()
if err != nil {
t.Fatal(err)
}
processes, err := container.Processes()
if err != nil {
t.Fatal(err)
@ -301,10 +309,10 @@ func TestEnter(t *testing.T) {
// Wait processes
stdinW2.Close()
waitProcess(pid2, t)
waitProcess(&pconfig2, t)
stdinW.Close()
waitProcess(pid, t)
waitProcess(&pconfig, t)
// Check that both processes live in the same pidns
pidns := string(stdout.Bytes())
@ -361,13 +369,18 @@ func TestFreeze(t *testing.T) {
Env: standardEnvironment,
Stdin: stdinR,
}
pid, err := container.Start(&pconfig)
err = container.Start(&pconfig)
stdinR.Close()
defer stdinW.Close()
if err != nil {
t.Fatal(err)
}
pid, err := pconfig.Pid()
if err != nil {
t.Fatal(err)
}
process, err := os.FindProcess(pid)
if err != nil {
t.Fatal(err)

View File

@ -3,7 +3,6 @@ package integration
import (
"os"
"strings"
"syscall"
"testing"
"github.com/docker/libcontainer"
@ -24,48 +23,45 @@ func TestExecIn(t *testing.T) {
t.Fatal(err)
}
defer container.Destroy()
buffers := newStdBuffers()
process := &libcontainer.Process{
Args: []string{"sleep", "10"},
Env: standardEnvironment,
Stdin: buffers.Stdin,
Stdout: buffers.Stdout,
Stderr: buffers.Stderr,
}
pid1, err := container.Start(process)
// Execute a first process in the container
stdinR, stdinW, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
buffers = newStdBuffers()
psPid, err := container.Start(&libcontainer.Process{
process := &libcontainer.Process{
Args: []string{"cat"},
Env: standardEnvironment,
Stdin: stdinR,
}
err = container.Start(process)
stdinR.Close()
defer stdinW.Close()
if err != nil {
t.Fatal(err)
}
buffers := newStdBuffers()
ps := &libcontainer.Process{
Args: []string{"ps"},
Env: standardEnvironment,
Stdin: buffers.Stdin,
Stdout: buffers.Stdout,
Stderr: buffers.Stderr,
})
if err != nil {
t.Fatal(err)
}
ps, err := os.FindProcess(psPid)
err = container.Start(ps)
if err != nil {
t.Fatal(err)
}
if _, err := ps.Wait(); err != nil {
t.Fatal(err)
}
p, err := os.FindProcess(pid1)
if err != nil {
t.Fatal(err)
}
if err := p.Signal(syscall.SIGKILL); err != nil {
t.Log(err)
}
if _, err := p.Wait(); err != nil {
stdinW.Close()
if _, err := process.Wait(); err != nil {
t.Log(err)
}
out := buffers.Stdout.String()
if !strings.Contains(out, "sleep 10") || !strings.Contains(out, "ps") {
if !strings.Contains(out, "cat") || !strings.Contains(out, "ps") {
t.Fatalf("unexpected running process, output %q", out)
}
}
@ -85,44 +81,40 @@ func TestExecInRlimit(t *testing.T) {
t.Fatal(err)
}
defer container.Destroy()
buffers := newStdBuffers()
process := &libcontainer.Process{
Args: []string{"sleep", "10"},
Env: standardEnvironment,
Stdin: buffers.Stdin,
Stdout: buffers.Stdout,
Stderr: buffers.Stderr,
}
pid1, err := container.Start(process)
stdinR, stdinW, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
buffers = newStdBuffers()
psPid, err := container.Start(&libcontainer.Process{
process := &libcontainer.Process{
Args: []string{"cat"},
Env: standardEnvironment,
Stdin: stdinR,
}
err = container.Start(process)
stdinR.Close()
defer stdinW.Close()
if err != nil {
t.Fatal(err)
}
buffers := newStdBuffers()
ps := &libcontainer.Process{
Args: []string{"/bin/sh", "-c", "ulimit -n"},
Env: standardEnvironment,
Stdin: buffers.Stdin,
Stdout: buffers.Stdout,
Stderr: buffers.Stderr,
})
if err != nil {
t.Fatal(err)
}
ps, err := os.FindProcess(psPid)
err = container.Start(ps)
if err != nil {
t.Fatal(err)
}
if _, err := ps.Wait(); err != nil {
t.Fatal(err)
}
p, err := os.FindProcess(pid1)
if err != nil {
t.Fatal(err)
}
if err := p.Signal(syscall.SIGKILL); err != nil {
t.Log(err)
}
if _, err := p.Wait(); err != nil {
stdinW.Close()
if _, err := process.Wait(); err != nil {
t.Log(err)
}
out := buffers.Stdout.String()

View File

@ -97,15 +97,11 @@ func runContainer(config *configs.Config, console string, args ...string) (buffe
Stderr: buffers.Stderr,
}
pid, err := container.Start(process)
err = container.Start(process)
if err != nil {
return nil, -1, err
}
p, err := os.FindProcess(pid)
if err != nil {
return nil, -1, err
}
ps, err := p.Wait()
ps, err := process.Wait()
if err != nil {
return nil, -1, err
}

View File

@ -60,7 +60,6 @@ func execAction(context *cli.Context) {
fatal(err)
}
}
go handleSignals(container, tty)
process := &libcontainer.Process{
Args: context.Args(),
Env: context.StringSlice("env"),
@ -73,7 +72,8 @@ func execAction(context *cli.Context) {
if err := tty.attach(process); err != nil {
fatal(err)
}
pid, err := container.Start(process)
go handleSignals(process, tty)
err = container.Start(process)
if err != nil {
tty.Close()
if created {
@ -81,15 +81,7 @@ func execAction(context *cli.Context) {
}
fatal(err)
}
proc, err := os.FindProcess(pid)
if err != nil {
tty.Close()
if created {
container.Destroy()
}
fatal(err)
}
status, err := proc.Wait()
status, err := process.Wait()
if err != nil {
tty.Close()
if created {
@ -107,7 +99,7 @@ func execAction(context *cli.Context) {
os.Exit(utils.ExitStatus(status.Sys().(syscall.WaitStatus)))
}
func handleSignals(container libcontainer.Container, tty *tty) {
func handleSignals(container *libcontainer.Process, tty *tty) {
sigc := make(chan os.Signal, 10)
signal.Notify(sigc)
tty.resize()

View File

@ -1,6 +1,15 @@
package libcontainer
import "io"
import (
"io"
"os"
)
type processOperations interface {
wait() (*os.ProcessState, error)
signal(sig os.Signal) error
pid() int
}
// Process specifies the configuration and IO for a process inside
// a container.
@ -26,4 +35,31 @@ type Process struct {
// Stderr is a pointer to a writer which receives the standard error stream.
Stderr io.Writer
ops processOperations
}
// Wait waits for the process to exit.
// Wait releases any resources associated with the Process
func (p Process) Wait() (*os.ProcessState, error) {
if p.ops == nil {
return nil, newGenericError(nil, ProcessNotExecuted)
}
return p.ops.wait()
}
// Pid returns the process ID
func (p Process) Pid() (int, error) {
if p.ops == nil {
return -1, newGenericError(nil, ProcessNotExecuted)
}
return p.ops.pid(), nil
}
// Signal sends a signal to the Process.
func (p Process) Signal(sig os.Signal) error {
if p.ops == nil {
return newGenericError(nil, ProcessNotExecuted)
}
return p.ops.signal(sig)
}

View File

@ -33,12 +33,11 @@ type parentProcess interface {
}
type setnsProcess struct {
cmd *exec.Cmd
parentPipe *os.File
childPipe *os.File
forkedProcess *os.Process
cgroupPaths map[string]string
config *initConfig
cmd *exec.Cmd
parentPipe *os.File
childPipe *os.File
cgroupPaths map[string]string
config *initConfig
}
func (p *setnsProcess) startTime() (string, error) {
@ -46,16 +45,16 @@ func (p *setnsProcess) startTime() (string, error) {
}
func (p *setnsProcess) signal(s os.Signal) error {
return p.forkedProcess.Signal(s)
return p.cmd.Process.Signal(s)
}
func (p *setnsProcess) start() (err error) {
defer p.parentPipe.Close()
if p.forkedProcess, err = p.execSetns(); err != nil {
if err = p.execSetns(); err != nil {
return newSystemError(err)
}
if len(p.cgroupPaths) > 0 {
if err := cgroups.EnterPid(p.cgroupPaths, p.forkedProcess.Pid); err != nil {
if err := cgroups.EnterPid(p.cgroupPaths, p.cmd.Process.Pid); err != nil {
return newSystemError(err)
}
}
@ -69,33 +68,40 @@ func (p *setnsProcess) start() (err error) {
// because setns support requires the C process to fork off a child and perform the setns
// before the go runtime boots, we wait on the process to die and receive the child's pid
// over the provided pipe.
func (p *setnsProcess) execSetns() (*os.Process, error) {
func (p *setnsProcess) execSetns() error {
err := p.cmd.Start()
p.childPipe.Close()
if err != nil {
return nil, newSystemError(err)
return newSystemError(err)
}
status, err := p.cmd.Process.Wait()
if err != nil {
return nil, newSystemError(err)
p.cmd.Wait()
return newSystemError(err)
}
if !status.Success() {
return nil, newSystemError(&exec.ExitError{ProcessState: status})
p.cmd.Wait()
return newSystemError(&exec.ExitError{ProcessState: status})
}
var pid *pid
if err := json.NewDecoder(p.parentPipe).Decode(&pid); err != nil {
return nil, newSystemError(err)
p.cmd.Wait()
return newSystemError(err)
}
return os.FindProcess(pid.Pid)
process, err := os.FindProcess(pid.Pid)
if err != nil {
return err
}
p.cmd.Process = process
return nil
}
// terminate sends a SIGKILL to the forked process for the setns routine then waits to
// avoid the process becomming a zombie.
func (p *setnsProcess) terminate() error {
if p.forkedProcess == nil {
return nil
}
err := p.forkedProcess.Kill()
err := p.cmd.Process.Kill()
if _, werr := p.wait(); err == nil {
err = werr
}
@ -103,11 +109,16 @@ func (p *setnsProcess) terminate() error {
}
func (p *setnsProcess) wait() (*os.ProcessState, error) {
return p.forkedProcess.Wait()
err := p.cmd.Wait()
if err != nil {
return nil, err
}
return p.cmd.ProcessState, nil
}
func (p *setnsProcess) pid() int {
return p.forkedProcess.Pid
return p.cmd.Process.Pid
}
type initProcess struct {
@ -159,7 +170,7 @@ func (p *initProcess) start() error {
}
func (p *initProcess) wait() (*os.ProcessState, error) {
state, err := p.cmd.Process.Wait()
err := p.cmd.Wait()
if err != nil {
return nil, err
}
@ -167,7 +178,7 @@ func (p *initProcess) wait() (*os.ProcessState, error) {
if p.cmd.SysProcAttr.Cloneflags&syscall.CLONE_NEWPID == 0 {
killCgroupProcesses(p.manager)
}
return state, nil
return p.cmd.ProcessState, nil
}
func (p *initProcess) terminate() error {