Merge pull request #252 from crosbymichael/syncpipe

Remove syncpipe pkg
This commit is contained in:
Mrunal Patel 2014-11-05 16:22:23 -08:00
commit 20e4502777
10 changed files with 114 additions and 289 deletions

View File

@ -6,7 +6,6 @@ import (
"runtime"
"github.com/docker/libcontainer/namespaces"
"github.com/docker/libcontainer/syncpipe"
)
// init runs the libcontainer initialization code because of the busybox style needs
@ -27,12 +26,7 @@ func init() {
log.Fatal(err)
}
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, 3)
if err != nil {
log.Fatalf("unable to create sync pipe: %s", err)
}
if err := namespaces.Init(container, rootfs, "", syncPipe, os.Args[3:]); err != nil {
if err := namespaces.Init(container, rootfs, "", os.NewFile(3, "pipe"), os.Args[3:]); err != nil {
log.Fatalf("unable to initialize for container: %s", err)
}
os.Exit(1)

View File

@ -3,6 +3,7 @@
package namespaces
import (
"encoding/json"
"io"
"os"
"os/exec"
@ -13,7 +14,6 @@ import (
"github.com/docker/libcontainer/cgroups/fs"
"github.com/docker/libcontainer/cgroups/systemd"
"github.com/docker/libcontainer/network"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
)
@ -22,19 +22,17 @@ import (
// Exec performs setup outside of a namespace so that a container can be
// executed. Exec is a high level function for working with container namespaces.
func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Writer, console, dataPath string, args []string, createCommand CreateCommand, startCallback func()) (int, error) {
var (
err error
)
var err error
// create a pipe so that we can syncronize with the namespaced process and
// pass the veth name to the child
syncPipe, err := syncpipe.NewSyncPipe()
// pass the state and configuration to the child process
parent, child, err := newInitPipe()
if err != nil {
return -1, err
}
defer syncPipe.Close()
defer parent.Close()
command := createCommand(container, console, dataPath, os.Args[0], syncPipe.Child(), args)
command := createCommand(container, console, dataPath, os.Args[0], child, args)
// Note: these are only used in non-tty mode
// if there is a tty for the container it will be opened within the namespace and the
// fds will be duped to stdin, stdiout, and stderr
@ -43,39 +41,47 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
command.Stderr = stderr
if err := command.Start(); err != nil {
child.Close()
return -1, err
}
child.Close()
// Now we passed the pipe to the child, close our side
syncPipe.CloseChild()
terminate := func(terr error) (int, error) {
// TODO: log the errors for kill and wait
command.Process.Kill()
command.Wait()
return -1, terr
}
started, err := system.GetProcessStartTime(command.Process.Pid)
if err != nil {
return -1, err
return terminate(err)
}
// Do this before syncing with child so that no children
// can escape the cgroup
cgroupRef, err := SetupCgroups(container, command.Process.Pid)
if err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
defer cgroupRef.Cleanup()
cgroupPaths, err := cgroupRef.Paths()
if err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
var networkState network.NetworkState
if err := InitializeNetworking(container, command.Process.Pid, syncPipe, &networkState); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
if err := InitializeNetworking(container, command.Process.Pid, &networkState); err != nil {
return terminate(err)
}
// send the state to the container's init process then shutdown writes for the parent
if err := json.NewEncoder(parent).Encode(networkState); err != nil {
return terminate(err)
}
// shutdown writes for the parent side of the pipe
if err := syscall.Shutdown(int(parent.Fd()), syscall.SHUT_WR); err != nil {
return terminate(err)
}
state := &libcontainer.State{
@ -86,17 +92,18 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
}
if err := libcontainer.SaveState(dataPath, state); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
defer libcontainer.DeleteState(dataPath)
// Sync with child
if err := syncPipe.ReadFromChild(); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
// wait for the child process to fully complete and receive an error message
// if one was encoutered
var ierr *initError
if err := json.NewDecoder(parent).Decode(&ierr); err != nil && err != io.EOF {
return terminate(err)
}
if ierr != nil {
return terminate(ierr)
}
if startCallback != nil {
@ -108,7 +115,6 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
return -1, err
}
}
return command.ProcessState.Sys().(syscall.WaitStatus).ExitStatus(), nil
}
@ -129,16 +135,6 @@ func DefaultCreateCommand(container *libcontainer.Config, console, dataPath, ini
"data_path=" + dataPath,
}
/*
TODO: move user and wd into env
if user != "" {
env = append(env, "user="+user)
}
if workingDir != "" {
env = append(env, "wd="+workingDir)
}
*/
command := exec.Command(init, append([]string{"init", "--"}, args...)...)
// make sure the process is executed inside the context of the rootfs
command.Dir = container.RootFs
@ -173,7 +169,7 @@ func SetupCgroups(container *libcontainer.Config, nspid int) (cgroups.ActiveCgro
// InitializeNetworking creates the container's network stack outside of the namespace and moves
// interfaces into the container's net namespaces if necessary
func InitializeNetworking(container *libcontainer.Config, nspid int, pipe *syncpipe.SyncPipe, networkState *network.NetworkState) error {
func InitializeNetworking(container *libcontainer.Config, nspid int, networkState *network.NetworkState) error {
for _, config := range container.Networks {
strategy, err := network.GetStrategy(config.Type)
if err != nil {
@ -183,18 +179,5 @@ func InitializeNetworking(container *libcontainer.Config, nspid int, pipe *syncp
return err
}
}
return pipe.SendToChild(networkState)
}
// GetNamespaceFlags parses the container's Namespaces options to set the correct
// flags on clone, unshare, and setns
func GetNamespaceFlags(namespaces map[string]bool) (flag int) {
for key, enabled := range namespaces {
if enabled {
if ns := GetNamespace(key); ns != nil {
flag |= ns.Value
}
}
}
return flag
return nil
}

View File

@ -3,6 +3,7 @@
package namespaces
import (
"encoding/json"
"fmt"
"io"
"os"
@ -15,7 +16,6 @@ import (
"github.com/docker/libcontainer/apparmor"
"github.com/docker/libcontainer/cgroups"
"github.com/docker/libcontainer/label"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
)
@ -41,11 +41,11 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
}
}
pipe, err := syncpipe.NewSyncPipe()
parent, child, err := newInitPipe()
if err != nil {
return -1, err
}
defer pipe.Close()
defer parent.Close()
// Note: these are only used in non-tty mode
// if there is a tty for the container it will be opened within the namespace and the
@ -53,23 +53,28 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.ExtraFiles = []*os.File{pipe.Child()}
cmd.ExtraFiles = []*os.File{child}
if err := cmd.Start(); err != nil {
child.Close()
return -1, err
}
pipe.CloseChild()
child.Close()
terminate := func(terr error) (int, error) {
// TODO: log the errors for kill and wait
cmd.Process.Kill()
cmd.Wait()
return -1, terr
}
// Enter cgroups.
if err := EnterCgroups(state, cmd.Process.Pid); err != nil {
return -1, err
return terminate(err)
}
if err := pipe.SendToChild(container); err != nil {
cmd.Process.Kill()
cmd.Wait()
return -1, err
if err := json.NewEncoder(parent).Encode(container); err != nil {
return terminate(err)
}
if startCallback != nil {
@ -81,7 +86,6 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
return -1, err
}
}
return cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus(), nil
}

View File

@ -3,7 +3,9 @@
package namespaces
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"syscall"
@ -18,7 +20,6 @@ import (
"github.com/docker/libcontainer/network"
"github.com/docker/libcontainer/security/capabilities"
"github.com/docker/libcontainer/security/restrict"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
"github.com/docker/libcontainer/user"
"github.com/docker/libcontainer/utils"
@ -30,11 +31,22 @@ import (
// and other options required for the new container.
// The caller of Init function has to ensure that the go runtime is locked to an OS thread
// (using runtime.LockOSThread) else system calls like setns called within Init may not work as intended.
func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, syncPipe *syncpipe.SyncPipe, args []string) (err error) {
func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, pipe *os.File, args []string) (err error) {
defer func() {
// if we have an error during the initialization of the container's init then send it back to the
// parent process in the form of an initError.
if err != nil {
syncPipe.ReportChildError(err)
// ensure that any data sent from the parent is consumed so it doesn't
// receive ECONNRESET when the child writes to the pipe.
ioutil.ReadAll(pipe)
if err := json.NewEncoder(pipe).Encode(initError{
Message: err.Error(),
}); err != nil {
panic(err)
}
}
// ensure that this pipe is always closed
pipe.Close()
}()
rootfs, err := utils.ResolveRootfs(uncleanRootfs)
@ -50,7 +62,7 @@ func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, syn
// We always read this as it is a way to sync with the parent as well
var networkState *network.NetworkState
if err := syncPipe.ReadFromParent(&networkState); err != nil {
if err := json.NewDecoder(pipe).Decode(&networkState); err != nil {
return err
}

38
namespaces/utils.go Normal file
View File

@ -0,0 +1,38 @@
// +build linux
package namespaces
import (
"os"
"syscall"
)
type initError struct {
Message string `json:"message,omitempty"`
}
func (i initError) Error() string {
return i.Message
}
// New returns a newly initialized Pipe for communication between processes
func newInitPipe() (parent *os.File, child *os.File, err error) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(fds[1]), "parent"), os.NewFile(uintptr(fds[0]), "child"), nil
}
// GetNamespaceFlags parses the container's Namespaces options to set the correct
// flags on clone, unshare, and setns
func GetNamespaceFlags(namespaces map[string]bool) (flag int) {
for key, enabled := range namespaces {
if enabled {
if ns := GetNamespace(key); ns != nil {
flag |= ns.Value
}
}
}
return flag
}

View File

@ -8,7 +8,6 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/libcontainer/namespaces"
"github.com/docker/libcontainer/syncpipe"
)
var (
@ -41,12 +40,8 @@ func initAction(context *cli.Context) {
log.Fatal(err)
}
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, uintptr(pipeFd))
if err != nil {
log.Fatalf("unable to create sync pipe: %s", err)
}
if err := namespaces.Init(container, rootfs, console, syncPipe, []string(context.Args())); err != nil {
pipe := os.NewFile(uintptr(pipeFd), "pipe")
if err := namespaces.Init(container, rootfs, console, pipe, []string(context.Args())); err != nil {
log.Fatalf("unable to initialize for container: %s", err)
}
}

View File

@ -8,7 +8,6 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/libcontainer"
"github.com/docker/libcontainer/syncpipe"
)
// rFunc is a function registration for calling after an execin
@ -59,16 +58,13 @@ func findUserArgs() []string {
// loadConfigFromFd loads a container's config from the sync pipe that is provided by
// fd 3 when running a process
func loadConfigFromFd() (*libcontainer.Config, error) {
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, 3)
if err != nil {
return nil, err
}
pipe := os.NewFile(3, "pipe")
defer pipe.Close()
var config *libcontainer.Config
if err := syncPipe.ReadFromParent(&config); err != nil {
if err := json.NewDecoder(pipe).Decode(&config); err != nil {
return nil, err
}
return config, nil
}

View File

@ -1,105 +0,0 @@
package syncpipe
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"syscall"
)
// SyncPipe allows communication to and from the child processes
// to it's parent and allows the two independent processes to
// syncronize their state.
type SyncPipe struct {
parent, child *os.File
}
func NewSyncPipeFromFd(parentFd, childFd uintptr) (*SyncPipe, error) {
s := &SyncPipe{}
if parentFd > 0 {
s.parent = os.NewFile(parentFd, "parentPipe")
} else if childFd > 0 {
s.child = os.NewFile(childFd, "childPipe")
} else {
return nil, fmt.Errorf("no valid sync pipe fd specified")
}
return s, nil
}
func (s *SyncPipe) Child() *os.File {
return s.child
}
func (s *SyncPipe) Parent() *os.File {
return s.parent
}
func (s *SyncPipe) SendToChild(v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
s.parent.Write(data)
return syscall.Shutdown(int(s.parent.Fd()), syscall.SHUT_WR)
}
func (s *SyncPipe) ReadFromChild() error {
data, err := ioutil.ReadAll(s.parent)
if err != nil {
return err
}
if len(data) > 0 {
return fmt.Errorf("%s", data)
}
return nil
}
func (s *SyncPipe) ReadFromParent(v interface{}) error {
data, err := ioutil.ReadAll(s.child)
if err != nil {
return fmt.Errorf("error reading from sync pipe %s", err)
}
if len(data) > 0 {
if err := json.Unmarshal(data, v); err != nil {
return err
}
}
return nil
}
func (s *SyncPipe) ReportChildError(err error) {
// ensure that any data sent from the parent is consumed so it doesn't
// receive ECONNRESET when the child writes to the pipe.
ioutil.ReadAll(s.child)
s.child.Write([]byte(err.Error()))
s.CloseChild()
}
func (s *SyncPipe) Close() error {
if s.parent != nil {
s.parent.Close()
}
if s.child != nil {
s.child.Close()
}
return nil
}
func (s *SyncPipe) CloseChild() {
if s.child != nil {
s.child.Close()
s.child = nil
}
}

View File

@ -1,20 +0,0 @@
package syncpipe
import (
"os"
"syscall"
)
func NewSyncPipe() (s *SyncPipe, err error) {
s = &SyncPipe{}
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return nil, err
}
s.child = os.NewFile(uintptr(fds[0]), "child syncpipe")
s.parent = os.NewFile(uintptr(fds[1]), "parent syncpipe")
return s, nil
}

View File

@ -1,72 +0,0 @@
package syncpipe
import (
"fmt"
"syscall"
"testing"
)
type testStruct struct {
Name string
}
func TestSendErrorFromChild(t *testing.T) {
pipe, err := NewSyncPipe()
if err != nil {
t.Fatal(err)
}
defer func() {
if err := pipe.Close(); err != nil {
t.Fatal(err)
}
}()
childfd, err := syscall.Dup(int(pipe.Child().Fd()))
if err != nil {
t.Fatal(err)
}
childPipe, _ := NewSyncPipeFromFd(0, uintptr(childfd))
pipe.CloseChild()
pipe.SendToChild(nil)
expected := "something bad happened"
childPipe.ReportChildError(fmt.Errorf(expected))
childError := pipe.ReadFromChild()
if childError == nil {
t.Fatal("expected an error to be returned but did not receive anything")
}
if childError.Error() != expected {
t.Fatalf("expected %q but received error message %q", expected, childError.Error())
}
}
func TestSendPayloadToChild(t *testing.T) {
pipe, err := NewSyncPipe()
if err != nil {
t.Fatal(err)
}
defer func() {
if err := pipe.Close(); err != nil {
t.Fatal(err)
}
}()
expected := "libcontainer"
if err := pipe.SendToChild(testStruct{Name: expected}); err != nil {
t.Fatal(err)
}
var s *testStruct
if err := pipe.ReadFromParent(&s); err != nil {
t.Fatal(err)
}
if s.Name != expected {
t.Fatalf("expected name %q but received %q", expected, s.Name)
}
}