vendors: upgrade the containerd vendors

kata shimv2 needs the commit of:
f05672357f,
thus upgrade it to the latest.

Signed-off-by: Fupan Li <lifupan@gmail.com>
This commit is contained in:
Fupan Li
2018-12-11 02:50:36 +00:00
parent 4cc94b6063
commit e4a3fd5565
14 changed files with 250 additions and 116 deletions

View File

@@ -31,7 +31,7 @@ import (
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 32
const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
@@ -47,7 +47,6 @@ func Reap() error {
Status: e.Status,
}
}
}
Default.Unlock()
return err

View File

@@ -53,11 +53,32 @@ type Shim interface {
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
}
// OptsKey is the context key for the Opts value.
type OptsKey struct{}
// Opts are context options associated with the shim invocation.
type Opts struct {
BundlePath string
Debug bool
}
// BinaryOpts allows the configuration of a shims binary setup
type BinaryOpts func(*Config)
// Config of shim binary options provided by shim implementations
type Config struct {
// NoSubreaper disables setting the shim as a child subreaper
NoSubreaper bool
// NoReaper disables the shim binary from reaping any child process implicitly
NoReaper bool
}
var (
debugFlag bool
idFlag string
namespaceFlag string
socketFlag string
bundlePath string
addressFlag string
containerdBinaryFlag string
action string
@@ -68,6 +89,7 @@ func parseFlags() {
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
@@ -107,32 +129,40 @@ func setLogger(ctx context.Context, id string) error {
}
// Run initializes and runs a shim server
func Run(id string, initFunc Init) {
if err := run(id, initFunc); err != nil {
func Run(id string, initFunc Init, opts ...BinaryOpts) {
var config Config
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
os.Exit(1)
}
}
func run(id string, initFunc Init) error {
func run(id string, initFunc Init, config Config) error {
parseFlags()
setRuntime()
signals, err := setupSignals()
signals, err := setupSignals(config)
if err != nil {
return err
}
if err := subreaper(); err != nil {
return err
if !config.NoSubreaper {
if err := subreaper(); err != nil {
return err
}
}
publisher := &remoteEventsPublisher{
address: addressFlag,
containerdBinaryPath: containerdBinaryFlag,
noReaper: config.NoReaper,
}
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
service, err := initFunc(ctx, idFlag, publisher)
@@ -254,4 +284,5 @@ func dumpStacks(logger *logrus.Entry) {
type remoteEventsPublisher struct {
address string
containerdBinaryPath string
noReaper bool
}

View File

@@ -39,9 +39,13 @@ import (
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
if !config.NoReaper {
smp = append(smp, unix.SIGCHLD)
}
signal.Notify(signals, smp...)
return signals, nil
}
@@ -87,7 +91,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
}
func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(context.Background(), "log", unix.O_WRONLY, 0700)
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
@@ -102,6 +106,15 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
if l.noReaper {
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return errors.Wrap(err, "failed to publish event")
}
return nil
}
c, err := Default.Start(cmd)
if err != nil {
return err

View File

@@ -40,7 +40,7 @@ import (
)
// setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) {
func setupSignals(config Config) (chan os.Signal, error) {
signals := make(chan os.Signal, 32)
return signals, nil
}
@@ -119,21 +119,150 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal) error {
}
}
var _ = (io.WriterTo)(&blockingBuffer{})
var _ = (io.Writer)(&blockingBuffer{})
// blockingBuffer implements the `io.Writer` and `io.WriterTo` interfaces. Once
// `capacity` is reached the calls to `Write` will block until a successful call
// to `WriterTo` frees up the buffer space.
//
// Note: This has the same threadding semantics as bytes.Buffer with no
// additional locking so multithreading is not supported.
type blockingBuffer struct {
c *sync.Cond
capacity int
buffer bytes.Buffer
}
func newBlockingBuffer(capacity int) *blockingBuffer {
return &blockingBuffer{
c: sync.NewCond(&sync.Mutex{}),
capacity: capacity,
}
}
func (bb *blockingBuffer) Len() int {
bb.c.L.Lock()
defer bb.c.L.Unlock()
return bb.buffer.Len()
}
func (bb *blockingBuffer) Write(p []byte) (int, error) {
if len(p) > bb.capacity {
return 0, errors.Errorf("len(p) (%d) too large for capacity (%d)", len(p), bb.capacity)
}
bb.c.L.Lock()
for bb.buffer.Len()+len(p) > bb.capacity {
bb.c.Wait()
}
defer bb.c.L.Unlock()
return bb.buffer.Write(p)
}
func (bb *blockingBuffer) WriteTo(w io.Writer) (int64, error) {
bb.c.L.Lock()
defer bb.c.L.Unlock()
defer bb.c.Signal()
return bb.buffer.WriteTo(w)
}
// deferredShimWriteLogger exists to solve the upstream loggin issue presented
// by using Windows Named Pipes for logging. When containerd restarts it tries
// to reconnect to any shims. This means that the connection to the logger will
// be severed but when containerd starts up it should reconnect and start
// logging again. We abstract all of this logic behind what looks like a simple
// `io.Writer` that can reconnect in the lifetime and buffers logs while
// disconnected.
type deferredShimWriteLogger struct {
mu sync.Mutex
ctx context.Context
wg sync.WaitGroup
connected bool
aborted bool
buffer *blockingBuffer
l net.Listener
c net.Conn
conerr error
}
// beginAccept issues an accept to wait for a connection. Once a connection
// occurs drains any outstanding buffer. While draining the buffer any writes
// are blocked. If the buffer fails to fully drain due to a connection drop a
// call to `beginAccept` is re-issued waiting for another connection from
// containerd.
func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Lock()
if dswl.connected {
return
}
dswl.mu.Unlock()
c, err := dswl.l.Accept()
if err == winio.ErrPipeListenerClosed {
dswl.mu.Lock()
dswl.aborted = true
dswl.l.Close()
dswl.conerr = errors.New("connection closed")
dswl.mu.Unlock()
return
}
dswl.mu.Lock()
dswl.connected = true
dswl.c = c
// Drain the buffer
if dswl.buffer.Len() > 0 {
_, err := dswl.buffer.WriteTo(dswl.c)
if err != nil {
// We lost our connection draining the buffer.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
}
}
dswl.mu.Unlock()
}
func (dswl *deferredShimWriteLogger) Write(p []byte) (int, error) {
dswl.wg.Wait()
if dswl.c == nil {
dswl.mu.Lock()
defer dswl.mu.Unlock()
if dswl.aborted {
return 0, dswl.conerr
}
return dswl.c.Write(p)
if dswl.connected {
// We have a connection. beginAccept would have drained the buffer so we just write our data to
// the connection directly.
written, err := dswl.c.Write(p)
if err != nil {
// We lost the connection.
dswl.connected = false
dswl.c.Close()
go dswl.beginAccept()
// We weren't able to write the full `p` bytes. Buffer the rest
if written != len(p) {
w, err := dswl.buffer.Write(p[written:])
if err != nil {
// We failed to buffer. Return this error
return written + w, err
}
written += w
}
}
return written, nil
}
// We are disconnected. Buffer the contents.
return dswl.buffer.Write(p)
}
// openLog on Windows acts as the server of the log pipe. This allows the
@@ -143,26 +272,17 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
buffer: newBlockingBuffer(64 * 1024), // 64KB,
}
l, err := winio.ListenPipe(fmt.Sprintf("\\\\.\\pipe\\containerd-shim-%s-%s-log", ns, id), nil)
if err != nil {
return nil, err
}
dswl := &deferredShimWriteLogger{
ctx: ctx,
}
// TODO: JTERRY75 - this will not work with restarts. Only the first
// connection will work and all +1 connections will return 'use of closed
// network connection'. Make this reconnect aware.
dswl.wg.Add(1)
go func() {
c, conerr := l.Accept()
if conerr != nil {
l.Close()
dswl.conerr = conerr
}
dswl.c = c
dswl.wg.Done()
}()
dswl.l = l
go dswl.beginAccept()
return dswl, nil
}

View File

@@ -24,6 +24,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/containerd/containerd/namespaces"
@@ -32,6 +33,8 @@ import (
const shimBinaryFormat = "containerd-shim-%s-%s"
var runtimePaths sync.Map
// Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
@@ -49,19 +52,33 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
}
args = append(args, cmdArgs...)
name := BinaryName(runtime)
if name == "" {
return nil, fmt.Errorf("invalid runtime name %s, correct runtime name should format like io.containerd.runc.v1", runtime)
}
var cmdPath string
var lerr error
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
if eerr, ok := lerr.(*exec.Error); ok {
if eerr.Err == exec.ErrNotFound {
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
cmdPathI, cmdPathFound := runtimePaths.Load(name)
if cmdPathFound {
cmdPath = cmdPathI.(string)
} else {
var lerr error
if cmdPath, lerr = exec.LookPath(name); lerr != nil {
if eerr, ok := lerr.(*exec.Error); ok {
if eerr.Err == exec.ErrNotFound {
return nil, errors.Wrapf(os.ErrNotExist, "runtime %q binary not installed %q", runtime, name)
}
}
}
cmdPath, err = filepath.Abs(cmdPath)
if err != nil {
return nil, err
}
if cmdPathI, cmdPathFound = runtimePaths.LoadOrStore(name, cmdPath); cmdPathFound {
// We didn't store cmdPath we loaded an already cached value. Use it.
cmdPath = cmdPathI.(string)
}
}
cmdPath, err = filepath.Abs(cmdPath)
if err != nil {
return nil, err
}
cmd := exec.Command(cmdPath, args...)
cmd.Dir = path
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
@@ -69,10 +86,15 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, cmdAr
return cmd, nil
}
// BinaryName returns the shim binary name from the runtime name
// BinaryName returns the shim binary name from the runtime name,
// empty string returns means runtime name is invalid
func BinaryName(runtime string) string {
// runtime name should format like $prefix.name.version
parts := strings.Split(runtime, ".")
// TODO: add validation for runtime
if len(parts) < 2 {
return ""
}
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
}

View File

@@ -51,11 +51,12 @@ func SocketAddress(ctx context.Context, id string) (string, error) {
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Now().Sub(start)
if remaining <= 0 {
lastError = errors.Errorf("timed out waiting for npipe %s", address)
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
@@ -65,6 +66,15 @@ func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Now().Sub(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
return c, lastError