Merge pull request #1807 from giuseppe/notify-no-block
sd-notify: do not hang when NOTIFY_SOCKET is used with create
This commit is contained in:
commit
167e33ca50
136
notify_socket.go
136
notify_socket.go
|
@ -7,11 +7,13 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/opencontainers/runc/libcontainer"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
|
@ -27,12 +29,12 @@ func newNotifySocket(context *cli.Context, notifySocketHost string, id string) *
|
|||
}
|
||||
|
||||
root := filepath.Join(context.GlobalString("root"), id)
|
||||
path := filepath.Join(root, "notify.sock")
|
||||
socketPath := filepath.Join(root, "notify", "notify.sock")
|
||||
|
||||
notifySocket := ¬ifySocket{
|
||||
socket: nil,
|
||||
host: notifySocketHost,
|
||||
socketPath: path,
|
||||
socketPath: socketPath,
|
||||
}
|
||||
|
||||
return notifySocket
|
||||
|
@ -44,13 +46,19 @@ func (s *notifySocket) Close() error {
|
|||
|
||||
// If systemd is supporting sd_notify protocol, this function will add support
|
||||
// for sd_notify protocol from within the container.
|
||||
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) {
|
||||
mount := specs.Mount{Destination: s.host, Source: s.socketPath, Options: []string{"bind"}}
|
||||
func (s *notifySocket) setupSpec(context *cli.Context, spec *specs.Spec) error {
|
||||
pathInContainer := filepath.Join("/run/notify", path.Base(s.socketPath))
|
||||
mount := specs.Mount{
|
||||
Destination: path.Dir(pathInContainer),
|
||||
Source: path.Dir(s.socketPath),
|
||||
Options: []string{"bind", "nosuid", "noexec", "nodev", "ro"},
|
||||
}
|
||||
spec.Mounts = append(spec.Mounts, mount)
|
||||
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", s.host))
|
||||
spec.Process.Env = append(spec.Process.Env, fmt.Sprintf("NOTIFY_SOCKET=%s", pathInContainer))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *notifySocket) setupSocket() error {
|
||||
func (s *notifySocket) bindSocket() error {
|
||||
addr := net.UnixAddr{
|
||||
Name: s.socketPath,
|
||||
Net: "unixgram",
|
||||
|
@ -71,46 +79,92 @@ func (s *notifySocket) setupSocket() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// pid1 must be set only with -d, as it is used to set the new process as the main process
|
||||
// for the service in systemd
|
||||
func (s *notifySocket) run(pid1 int) {
|
||||
buf := make([]byte, 512)
|
||||
notifySocketHostAddr := net.UnixAddr{Name: s.host, Net: "unixgram"}
|
||||
func (s *notifySocket) setupSocketDirectory() error {
|
||||
return os.Mkdir(path.Dir(s.socketPath), 0755)
|
||||
}
|
||||
|
||||
func notifySocketStart(context *cli.Context, notifySocketHost, id string) (*notifySocket, error) {
|
||||
notifySocket := newNotifySocket(context, notifySocketHost, id)
|
||||
if notifySocket == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := notifySocket.bindSocket(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return notifySocket, nil
|
||||
}
|
||||
|
||||
func (n *notifySocket) waitForContainer(container libcontainer.Container) error {
|
||||
s, err := container.State()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.run(s.InitProcessPid)
|
||||
}
|
||||
|
||||
func (n *notifySocket) run(pid1 int) error {
|
||||
if n.socket == nil {
|
||||
return nil
|
||||
}
|
||||
notifySocketHostAddr := net.UnixAddr{Name: n.host, Net: "unixgram"}
|
||||
client, err := net.DialUnix("unixgram", nil, ¬ifySocketHostAddr)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
for {
|
||||
r, err := s.socket.Read(buf)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
var out bytes.Buffer
|
||||
for _, line := range bytes.Split(buf[0:r], []byte{'\n'}) {
|
||||
if bytes.HasPrefix(line, []byte("READY=")) {
|
||||
_, err = out.Write(line)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = out.Write([]byte{'\n'})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(time.Millisecond * 100)
|
||||
defer ticker.Stop()
|
||||
|
||||
_, err = client.Write(out.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// now we can inform systemd to use pid1 as the pid to monitor
|
||||
if pid1 > 0 {
|
||||
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
|
||||
client.Write([]byte(newPid))
|
||||
}
|
||||
fileChan := make(chan []byte)
|
||||
go func() {
|
||||
for {
|
||||
buf := make([]byte, 4096)
|
||||
r, err := n.socket.Read(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
got := buf[0:r]
|
||||
// systemd-ready sends a single datagram with the state string as payload,
|
||||
// so we don't need to worry about partial messages.
|
||||
for _, line := range bytes.Split(got, []byte{'\n'}) {
|
||||
if bytes.HasPrefix(got, []byte("READY=")) {
|
||||
fileChan <- line
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
_, err := os.Stat(filepath.Join("/proc", strconv.Itoa(pid1)))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
case b := <-fileChan:
|
||||
var out bytes.Buffer
|
||||
_, err = out.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = out.Write([]byte{'\n'})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = client.Write(out.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// now we can inform systemd to use pid1 as the pid to monitor
|
||||
newPid := fmt.Sprintf("MAINPID=%d\n", pid1)
|
||||
client.Write([]byte(newPid))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,6 +70,7 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
|
|||
h.notifySocket.run(pid1)
|
||||
return 0, nil
|
||||
}
|
||||
h.notifySocket.run(os.Getpid())
|
||||
go h.notifySocket.run(0)
|
||||
}
|
||||
|
||||
|
@ -97,9 +98,6 @@ func (h *signalHandler) forward(process *libcontainer.Process, tty *tty, detach
|
|||
// status because we must ensure that any of the go specific process
|
||||
// fun such as flushing pipes are complete before we return.
|
||||
process.Wait()
|
||||
if h.notifySocket != nil {
|
||||
h.notifySocket.Close()
|
||||
}
|
||||
return e.status, nil
|
||||
}
|
||||
}
|
||||
|
|
13
start.go
13
start.go
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/opencontainers/runc/libcontainer"
|
||||
"github.com/urfave/cli"
|
||||
|
@ -31,7 +32,17 @@ your host.`,
|
|||
}
|
||||
switch status {
|
||||
case libcontainer.Created:
|
||||
return container.Exec()
|
||||
notifySocket, err := notifySocketStart(context, os.Getenv("NOTIFY_SOCKET"), container.ID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := container.Exec(); err != nil {
|
||||
return err
|
||||
}
|
||||
if notifySocket != nil {
|
||||
return notifySocket.waitForContainer(container)
|
||||
}
|
||||
return nil
|
||||
case libcontainer.Stopped:
|
||||
return errors.New("cannot start a container that has stopped")
|
||||
case libcontainer.Running:
|
||||
|
|
|
@ -408,7 +408,9 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp
|
|||
|
||||
notifySocket := newNotifySocket(context, os.Getenv("NOTIFY_SOCKET"), id)
|
||||
if notifySocket != nil {
|
||||
notifySocket.setupSpec(context, spec)
|
||||
if err := notifySocket.setupSpec(context, spec); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
|
||||
container, err := createContainer(context, id, spec)
|
||||
|
@ -417,10 +419,16 @@ func startContainer(context *cli.Context, spec *specs.Spec, action CtAct, criuOp
|
|||
}
|
||||
|
||||
if notifySocket != nil {
|
||||
err := notifySocket.setupSocket()
|
||||
err := notifySocket.setupSocketDirectory()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
if action == CT_ACT_RUN {
|
||||
err := notifySocket.bindSocket()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Support on-demand socket activation by passing file descriptors into the container init process.
|
||||
|
|
Loading…
Reference in New Issue