Remove syncpipe pkg

This removes the entire syncpipe package and replaces it with standard
operations on the pipes.  The syncpipe type just never felt right and
probably should not have been there.

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2014-11-04 02:18:55 +00:00
parent ee6f15aabc
commit 2be676643e
10 changed files with 114 additions and 289 deletions

View File

@ -6,7 +6,6 @@ import (
"runtime"
"github.com/docker/libcontainer/namespaces"
"github.com/docker/libcontainer/syncpipe"
)
// init runs the libcontainer initialization code because of the busybox style needs
@ -27,12 +26,7 @@ func init() {
log.Fatal(err)
}
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, 3)
if err != nil {
log.Fatalf("unable to create sync pipe: %s", err)
}
if err := namespaces.Init(container, rootfs, "", syncPipe, os.Args[3:]); err != nil {
if err := namespaces.Init(container, rootfs, "", os.NewFile(3, "pipe"), os.Args[3:]); err != nil {
log.Fatalf("unable to initialize for container: %s", err)
}
os.Exit(1)

View File

@ -3,6 +3,7 @@
package namespaces
import (
"encoding/json"
"io"
"os"
"os/exec"
@ -13,7 +14,6 @@ import (
"github.com/docker/libcontainer/cgroups/fs"
"github.com/docker/libcontainer/cgroups/systemd"
"github.com/docker/libcontainer/network"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
)
@ -22,19 +22,17 @@ import (
// Exec performs setup outside of a namespace so that a container can be
// executed. Exec is a high level function for working with container namespaces.
func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Writer, console, dataPath string, args []string, createCommand CreateCommand, startCallback func()) (int, error) {
var (
err error
)
var err error
// create a pipe so that we can syncronize with the namespaced process and
// pass the veth name to the child
syncPipe, err := syncpipe.NewSyncPipe()
// pass the state and configuration to the child process
parent, child, err := newInitPipe()
if err != nil {
return -1, err
}
defer syncPipe.Close()
defer parent.Close()
command := createCommand(container, console, dataPath, os.Args[0], syncPipe.Child(), args)
command := createCommand(container, console, dataPath, os.Args[0], child, args)
// Note: these are only used in non-tty mode
// if there is a tty for the container it will be opened within the namespace and the
// fds will be duped to stdin, stdiout, and stderr
@ -43,39 +41,47 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
command.Stderr = stderr
if err := command.Start(); err != nil {
child.Close()
return -1, err
}
child.Close()
// Now we passed the pipe to the child, close our side
syncPipe.CloseChild()
terminate := func(terr error) (int, error) {
// TODO: log the errors for kill and wait
command.Process.Kill()
command.Wait()
return -1, terr
}
started, err := system.GetProcessStartTime(command.Process.Pid)
if err != nil {
return -1, err
return terminate(err)
}
// Do this before syncing with child so that no children
// can escape the cgroup
cgroupRef, err := SetupCgroups(container, command.Process.Pid)
if err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
defer cgroupRef.Cleanup()
cgroupPaths, err := cgroupRef.Paths()
if err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
var networkState network.NetworkState
if err := InitializeNetworking(container, command.Process.Pid, syncPipe, &networkState); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
if err := InitializeNetworking(container, command.Process.Pid, &networkState); err != nil {
return terminate(err)
}
// send the state to the container's init process then shutdown writes for the parent
if err := json.NewEncoder(parent).Encode(networkState); err != nil {
return terminate(err)
}
// shutdown writes for the parent side of the pipe
if err := syscall.Shutdown(int(parent.Fd()), syscall.SHUT_WR); err != nil {
return terminate(err)
}
state := &libcontainer.State{
@ -86,17 +92,18 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
}
if err := libcontainer.SaveState(dataPath, state); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
return terminate(err)
}
defer libcontainer.DeleteState(dataPath)
// Sync with child
if err := syncPipe.ReadFromChild(); err != nil {
command.Process.Kill()
command.Wait()
return -1, err
// wait for the child process to fully complete and receive an error message
// if one was encoutered
var ierr *initError
if err := json.NewDecoder(parent).Decode(&ierr); err != nil && err != io.EOF {
return terminate(err)
}
if ierr != nil {
return terminate(ierr)
}
if startCallback != nil {
@ -108,7 +115,6 @@ func Exec(container *libcontainer.Config, stdin io.Reader, stdout, stderr io.Wri
return -1, err
}
}
return command.ProcessState.Sys().(syscall.WaitStatus).ExitStatus(), nil
}
@ -129,16 +135,6 @@ func DefaultCreateCommand(container *libcontainer.Config, console, dataPath, ini
"data_path=" + dataPath,
}
/*
TODO: move user and wd into env
if user != "" {
env = append(env, "user="+user)
}
if workingDir != "" {
env = append(env, "wd="+workingDir)
}
*/
command := exec.Command(init, append([]string{"init", "--"}, args...)...)
// make sure the process is executed inside the context of the rootfs
command.Dir = container.RootFs
@ -173,7 +169,7 @@ func SetupCgroups(container *libcontainer.Config, nspid int) (cgroups.ActiveCgro
// InitializeNetworking creates the container's network stack outside of the namespace and moves
// interfaces into the container's net namespaces if necessary
func InitializeNetworking(container *libcontainer.Config, nspid int, pipe *syncpipe.SyncPipe, networkState *network.NetworkState) error {
func InitializeNetworking(container *libcontainer.Config, nspid int, networkState *network.NetworkState) error {
for _, config := range container.Networks {
strategy, err := network.GetStrategy(config.Type)
if err != nil {
@ -183,18 +179,5 @@ func InitializeNetworking(container *libcontainer.Config, nspid int, pipe *syncp
return err
}
}
return pipe.SendToChild(networkState)
}
// GetNamespaceFlags parses the container's Namespaces options to set the correct
// flags on clone, unshare, and setns
func GetNamespaceFlags(namespaces map[string]bool) (flag int) {
for key, enabled := range namespaces {
if enabled {
if ns := GetNamespace(key); ns != nil {
flag |= ns.Value
}
}
}
return flag
return nil
}

View File

@ -3,6 +3,7 @@
package namespaces
import (
"encoding/json"
"fmt"
"io"
"os"
@ -15,7 +16,6 @@ import (
"github.com/docker/libcontainer/apparmor"
"github.com/docker/libcontainer/cgroups"
"github.com/docker/libcontainer/label"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
)
@ -41,11 +41,11 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
}
}
pipe, err := syncpipe.NewSyncPipe()
parent, child, err := newInitPipe()
if err != nil {
return -1, err
}
defer pipe.Close()
defer parent.Close()
// Note: these are only used in non-tty mode
// if there is a tty for the container it will be opened within the namespace and the
@ -53,23 +53,28 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
cmd.Stdin = stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.ExtraFiles = []*os.File{pipe.Child()}
cmd.ExtraFiles = []*os.File{child}
if err := cmd.Start(); err != nil {
child.Close()
return -1, err
}
pipe.CloseChild()
child.Close()
terminate := func(terr error) (int, error) {
// TODO: log the errors for kill and wait
cmd.Process.Kill()
cmd.Wait()
return -1, terr
}
// Enter cgroups.
if err := EnterCgroups(state, cmd.Process.Pid); err != nil {
return -1, err
return terminate(err)
}
if err := pipe.SendToChild(container); err != nil {
cmd.Process.Kill()
cmd.Wait()
return -1, err
if err := json.NewEncoder(parent).Encode(container); err != nil {
return terminate(err)
}
if startCallback != nil {
@ -81,7 +86,6 @@ func ExecIn(container *libcontainer.Config, state *libcontainer.State, userArgs
return -1, err
}
}
return cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus(), nil
}

View File

@ -3,7 +3,9 @@
package namespaces
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"syscall"
@ -18,7 +20,6 @@ import (
"github.com/docker/libcontainer/network"
"github.com/docker/libcontainer/security/capabilities"
"github.com/docker/libcontainer/security/restrict"
"github.com/docker/libcontainer/syncpipe"
"github.com/docker/libcontainer/system"
"github.com/docker/libcontainer/user"
"github.com/docker/libcontainer/utils"
@ -30,11 +31,22 @@ import (
// and other options required for the new container.
// The caller of Init function has to ensure that the go runtime is locked to an OS thread
// (using runtime.LockOSThread) else system calls like setns called within Init may not work as intended.
func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, syncPipe *syncpipe.SyncPipe, args []string) (err error) {
func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, pipe *os.File, args []string) (err error) {
defer func() {
// if we have an error during the initialization of the container's init then send it back to the
// parent process in the form of an initError.
if err != nil {
syncPipe.ReportChildError(err)
// ensure that any data sent from the parent is consumed so it doesn't
// receive ECONNRESET when the child writes to the pipe.
ioutil.ReadAll(pipe)
if err := json.NewEncoder(pipe).Encode(initError{
Message: err.Error(),
}); err != nil {
panic(err)
}
}
// ensure that this pipe is always closed
pipe.Close()
}()
rootfs, err := utils.ResolveRootfs(uncleanRootfs)
@ -50,7 +62,7 @@ func Init(container *libcontainer.Config, uncleanRootfs, consolePath string, syn
// We always read this as it is a way to sync with the parent as well
var networkState *network.NetworkState
if err := syncPipe.ReadFromParent(&networkState); err != nil {
if err := json.NewDecoder(pipe).Decode(&networkState); err != nil {
return err
}

38
namespaces/utils.go Normal file
View File

@ -0,0 +1,38 @@
// +build linux
package namespaces
import (
"os"
"syscall"
)
type initError struct {
Message string `json:"message,omitempty"`
}
func (i initError) Error() string {
return i.Message
}
// New returns a newly initialized Pipe for communication between processes
func newInitPipe() (parent *os.File, child *os.File, err error) {
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(fds[1]), "parent"), os.NewFile(uintptr(fds[0]), "child"), nil
}
// GetNamespaceFlags parses the container's Namespaces options to set the correct
// flags on clone, unshare, and setns
func GetNamespaceFlags(namespaces map[string]bool) (flag int) {
for key, enabled := range namespaces {
if enabled {
if ns := GetNamespace(key); ns != nil {
flag |= ns.Value
}
}
}
return flag
}

View File

@ -8,7 +8,6 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/libcontainer/namespaces"
"github.com/docker/libcontainer/syncpipe"
)
var (
@ -41,12 +40,8 @@ func initAction(context *cli.Context) {
log.Fatal(err)
}
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, uintptr(pipeFd))
if err != nil {
log.Fatalf("unable to create sync pipe: %s", err)
}
if err := namespaces.Init(container, rootfs, console, syncPipe, []string(context.Args())); err != nil {
pipe := os.NewFile(uintptr(pipeFd), "pipe")
if err := namespaces.Init(container, rootfs, console, pipe, []string(context.Args())); err != nil {
log.Fatalf("unable to initialize for container: %s", err)
}
}

View File

@ -8,7 +8,6 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/libcontainer"
"github.com/docker/libcontainer/syncpipe"
)
// rFunc is a function registration for calling after an execin
@ -59,16 +58,13 @@ func findUserArgs() []string {
// loadConfigFromFd loads a container's config from the sync pipe that is provided by
// fd 3 when running a process
func loadConfigFromFd() (*libcontainer.Config, error) {
syncPipe, err := syncpipe.NewSyncPipeFromFd(0, 3)
if err != nil {
return nil, err
}
pipe := os.NewFile(3, "pipe")
defer pipe.Close()
var config *libcontainer.Config
if err := syncPipe.ReadFromParent(&config); err != nil {
if err := json.NewDecoder(pipe).Decode(&config); err != nil {
return nil, err
}
return config, nil
}

View File

@ -1,105 +0,0 @@
package syncpipe
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"syscall"
)
// SyncPipe allows communication to and from the child processes
// to it's parent and allows the two independent processes to
// syncronize their state.
type SyncPipe struct {
parent, child *os.File
}
func NewSyncPipeFromFd(parentFd, childFd uintptr) (*SyncPipe, error) {
s := &SyncPipe{}
if parentFd > 0 {
s.parent = os.NewFile(parentFd, "parentPipe")
} else if childFd > 0 {
s.child = os.NewFile(childFd, "childPipe")
} else {
return nil, fmt.Errorf("no valid sync pipe fd specified")
}
return s, nil
}
func (s *SyncPipe) Child() *os.File {
return s.child
}
func (s *SyncPipe) Parent() *os.File {
return s.parent
}
func (s *SyncPipe) SendToChild(v interface{}) error {
data, err := json.Marshal(v)
if err != nil {
return err
}
s.parent.Write(data)
return syscall.Shutdown(int(s.parent.Fd()), syscall.SHUT_WR)
}
func (s *SyncPipe) ReadFromChild() error {
data, err := ioutil.ReadAll(s.parent)
if err != nil {
return err
}
if len(data) > 0 {
return fmt.Errorf("%s", data)
}
return nil
}
func (s *SyncPipe) ReadFromParent(v interface{}) error {
data, err := ioutil.ReadAll(s.child)
if err != nil {
return fmt.Errorf("error reading from sync pipe %s", err)
}
if len(data) > 0 {
if err := json.Unmarshal(data, v); err != nil {
return err
}
}
return nil
}
func (s *SyncPipe) ReportChildError(err error) {
// ensure that any data sent from the parent is consumed so it doesn't
// receive ECONNRESET when the child writes to the pipe.
ioutil.ReadAll(s.child)
s.child.Write([]byte(err.Error()))
s.CloseChild()
}
func (s *SyncPipe) Close() error {
if s.parent != nil {
s.parent.Close()
}
if s.child != nil {
s.child.Close()
}
return nil
}
func (s *SyncPipe) CloseChild() {
if s.child != nil {
s.child.Close()
s.child = nil
}
}

View File

@ -1,20 +0,0 @@
package syncpipe
import (
"os"
"syscall"
)
func NewSyncPipe() (s *SyncPipe, err error) {
s = &SyncPipe{}
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return nil, err
}
s.child = os.NewFile(uintptr(fds[0]), "child syncpipe")
s.parent = os.NewFile(uintptr(fds[1]), "parent syncpipe")
return s, nil
}

View File

@ -1,72 +0,0 @@
package syncpipe
import (
"fmt"
"syscall"
"testing"
)
type testStruct struct {
Name string
}
func TestSendErrorFromChild(t *testing.T) {
pipe, err := NewSyncPipe()
if err != nil {
t.Fatal(err)
}
defer func() {
if err := pipe.Close(); err != nil {
t.Fatal(err)
}
}()
childfd, err := syscall.Dup(int(pipe.Child().Fd()))
if err != nil {
t.Fatal(err)
}
childPipe, _ := NewSyncPipeFromFd(0, uintptr(childfd))
pipe.CloseChild()
pipe.SendToChild(nil)
expected := "something bad happened"
childPipe.ReportChildError(fmt.Errorf(expected))
childError := pipe.ReadFromChild()
if childError == nil {
t.Fatal("expected an error to be returned but did not receive anything")
}
if childError.Error() != expected {
t.Fatalf("expected %q but received error message %q", expected, childError.Error())
}
}
func TestSendPayloadToChild(t *testing.T) {
pipe, err := NewSyncPipe()
if err != nil {
t.Fatal(err)
}
defer func() {
if err := pipe.Close(); err != nil {
t.Fatal(err)
}
}()
expected := "libcontainer"
if err := pipe.SendToChild(testStruct{Name: expected}); err != nil {
t.Fatal(err)
}
var s *testStruct
if err := pipe.ReadFromParent(&s); err != nil {
t.Fatal(err)
}
if s.Name != expected {
t.Fatalf("expected name %q but received %q", expected, s.Name)
}
}