runtime: refactor commandline code directory

Move all command line code to `cmd` and move containerd-shim-v2 to pkg.

Fixes: #2627
Signed-off-by: Peng Tao <bergwolf@hyper.sh>
This commit is contained in:
Peng Tao
2021-09-14 12:09:08 +08:00
parent 7bf96d2457
commit 4f7cc18622
77 changed files with 32 additions and 33 deletions

View File

@@ -0,0 +1,70 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"io"
"time"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/opencontainers/runtime-spec/specs-go"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
)
type container struct {
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
}
func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
if r == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, " CreateTaskRequest points to nil")
}
// in order to avoid deferencing a nil pointer in test
if spec == nil {
spec = &specs.Spec{}
}
c := &container{
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted,
}
return c, nil
}

View File

@@ -0,0 +1,41 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"testing"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/stretchr/testify/assert"
)
func TestNewContainer(t *testing.T) {
assert := assert.New(t)
_, err := newContainer(nil, nil, "", nil, false)
assert.Error(err)
}
func TestGetExec(t *testing.T) {
assert := assert.New(t)
r := &taskAPI.CreateTaskRequest{}
c, err := newContainer(nil, r, "", nil, true)
assert.NoError(err)
_, err = c.getExec("")
assert.Error(err)
c.execs = make(map[string]*exec)
_, err = c.getExec("")
assert.Error(err)
c.execs[TestID] = &exec{}
_, err = c.getExec(TestID)
assert.NoError(err)
}

View File

@@ -0,0 +1,261 @@
// Copyright (c) 2014,2015,2016 Docker, Inc.
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"os"
"path/filepath"
containerd_types "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/mount"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/typeurl"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
// only register the proto type
crioption "github.com/containerd/containerd/pkg/runtimeoptions/v1"
_ "github.com/containerd/containerd/runtime/linux/runctypes"
_ "github.com/containerd/containerd/runtime/v2/runc/options"
oldcrioption "github.com/containerd/cri-containerd/pkg/api/runtimeoptions/v1"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils/katatrace"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/compatoci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
)
func create(ctx context.Context, s *service, r *taskAPI.CreateTaskRequest) (*container, error) {
rootFs := vc.RootFs{}
if len(r.Rootfs) == 1 {
m := r.Rootfs[0]
rootFs.Source = m.Source
rootFs.Type = m.Type
rootFs.Options = m.Options
}
detach := !r.Terminal
ociSpec, bundlePath, err := loadSpec(r)
if err != nil {
return nil, err
}
containerType, err := oci.ContainerType(*ociSpec)
if err != nil {
return nil, err
}
disableOutput := noNeedForOutput(detach, ociSpec.Process.Terminal)
rootfs := filepath.Join(r.Bundle, "rootfs")
switch containerType {
case vc.PodSandbox:
if s.sandbox != nil {
return nil, fmt.Errorf("cannot create another sandbox in sandbox: %s", s.sandbox.ID())
}
s.config, err = loadRuntimeConfig(s, r, ociSpec.Annotations)
if err != nil {
return nil, err
}
// create tracer
// This is the earliest location we can create the tracer because we must wait
// until the runtime config is loaded
jaegerConfig := &katatrace.JaegerConfig{
JaegerEndpoint: s.config.JaegerEndpoint,
JaegerUser: s.config.JaegerUser,
JaegerPassword: s.config.JaegerPassword,
}
_, err = katatrace.CreateTracer("kata", jaegerConfig)
if err != nil {
return nil, err
}
// create root span
rootSpan, newCtx := katatrace.Trace(s.ctx, shimLog, "root span", shimTracingTags)
s.rootCtx = newCtx
defer rootSpan.End()
// create span
span, newCtx := katatrace.Trace(s.rootCtx, shimLog, "create", shimTracingTags)
s.ctx = newCtx
defer span.End()
if rootFs.Mounted, err = checkAndMount(s, r); err != nil {
return nil, err
}
defer func() {
if err != nil && rootFs.Mounted {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
shimLog.WithField("container-type", containerType).WithError(err2).Warn("failed to cleanup rootfs mount")
}
}
}()
katautils.HandleFactory(ctx, vci, s.config)
// Pass service's context instead of local ctx to CreateSandbox(), since local
// ctx will be canceled after this rpc service call, but the sandbox will live
// across multiple rpc service calls.
//
sandbox, _, err := katautils.CreateSandbox(s.ctx, vci, *ociSpec, *s.config, rootFs, r.ID, bundlePath, "", disableOutput, false)
if err != nil {
return nil, err
}
s.sandbox = sandbox
pid, err := s.sandbox.GetHypervisorPid()
if err != nil {
return nil, err
}
s.hpid = uint32(pid)
go s.startManagementServer(ctx, ociSpec)
case vc.PodContainer:
span, ctx := katatrace.Trace(s.ctx, shimLog, "create", shimTracingTags)
defer span.End()
if s.sandbox == nil {
return nil, fmt.Errorf("BUG: Cannot start the container, since the sandbox hasn't been created")
}
if rootFs.Mounted, err = checkAndMount(s, r); err != nil {
return nil, err
}
defer func() {
if err != nil && rootFs.Mounted {
if err2 := mount.UnmountAll(rootfs, 0); err2 != nil {
shimLog.WithField("container-type", containerType).WithError(err2).Warn("failed to cleanup rootfs mount")
}
}
}()
_, err = katautils.CreateContainer(ctx, s.sandbox, *ociSpec, rootFs, r.ID, bundlePath, "", disableOutput)
if err != nil {
return nil, err
}
}
container, err := newContainer(s, r, containerType, ociSpec, rootFs.Mounted)
if err != nil {
return nil, err
}
return container, nil
}
func loadSpec(r *taskAPI.CreateTaskRequest) (*specs.Spec, string, error) {
// Checks the MUST and MUST NOT from OCI runtime specification
bundlePath, err := validBundle(r.ID, r.Bundle)
if err != nil {
return nil, "", err
}
ociSpec, err := compatoci.ParseConfigJSON(bundlePath)
if err != nil {
return nil, "", err
}
return &ociSpec, bundlePath, nil
}
// Config override ordering(high to low):
// 1. podsandbox annotation
// 2. shimv2 create task option
// 3. environment
func loadRuntimeConfig(s *service, r *taskAPI.CreateTaskRequest, anno map[string]string) (*oci.RuntimeConfig, error) {
if s.config != nil {
return s.config, nil
}
configPath := oci.GetSandboxConfigPath(anno)
if configPath == "" && r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
option, ok := v.(*crioption.Options)
// cri default runtime handler will pass a linux runc options,
// and we'll ignore it.
if ok {
configPath = option.ConfigPath
} else {
// Some versions of containerd, such as 1.4.3, and 1.4.4
// still rely on the runtime options coming from
// github.com/containerd/cri-containerd/pkg/api/runtimeoptions/v1
// Knowing that, instead of breaking compatibility with such
// versions, let's work this around on our side
oldOption, ok := v.(*oldcrioption.Options)
if ok {
configPath = oldOption.ConfigPath
}
}
}
// Try to get the config file from the env KATA_CONF_FILE
if configPath == "" {
configPath = os.Getenv("KATA_CONF_FILE")
}
_, runtimeConfig, err := katautils.LoadConfiguration(configPath, false)
if err != nil {
return nil, err
}
// For the unit test, the config will be predefined
if s.config == nil {
s.config = &runtimeConfig
}
return &runtimeConfig, nil
}
func checkAndMount(s *service, r *taskAPI.CreateTaskRequest) (bool, error) {
if len(r.Rootfs) == 1 {
m := r.Rootfs[0]
// Plug the block backed rootfs directly instead of mounting it.
if katautils.IsBlockDevice(m.Source) && !s.config.HypervisorConfig.DisableBlockDeviceUse {
return false, nil
}
}
rootfs := filepath.Join(r.Bundle, "rootfs")
if err := doMount(r.Rootfs, rootfs); err != nil {
return false, err
}
return true, nil
}
func doMount(mounts []*containerd_types.Mount, rootfs string) error {
if len(mounts) == 0 {
return nil
}
if _, err := os.Stat(rootfs); os.IsNotExist(err) {
if err := os.Mkdir(rootfs, 0711); err != nil {
return err
}
}
for _, rm := range mounts {
m := &mount.Mount{
Type: rm.Type,
Source: rm.Source,
Options: rm.Options,
}
if err := m.Mount(rootfs); err != nil {
return errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
return nil
}

View File

@@ -0,0 +1,493 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"
"github.com/containerd/containerd/namespaces"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
crioption "github.com/containerd/cri-containerd/pkg/api/runtimeoptions/v1"
"github.com/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
ktu "github.com/kata-containers/kata-containers/src/runtime/pkg/katatestutils"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/compatoci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
)
func TestCreateSandboxSuccess(t *testing.T) {
if tc.NotValid(ktu.NeedRoot()) {
t.Skip(ktu.TestDisabledNeedRoot)
}
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
MockContainers: []*vcmock.Container{
{MockID: testContainerID},
},
}
testingImpl.CreateSandboxFunc = func(ctx context.Context, sandboxConfig vc.SandboxConfig) (vc.VCSandbox, error) {
return sandbox, nil
}
defer func() {
testingImpl.CreateSandboxFunc = nil
}()
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
// Force sandbox-type container
spec.Annotations = make(map[string]string)
spec.Annotations[testContainerTypeAnnotation] = testContainerTypeSandbox
// Set a limit to ensure processCgroupsPath() considers the
// cgroup part of the spec
limit := int64(1024 * 1024)
spec.Linux.Resources.Memory = &specs.LinuxMemory{
Limit: &limit,
}
// Rewrite the file
err = writeOCIConfigFile(spec, ociConfigFile)
assert.NoError(err)
s := &service{
id: testSandboxID,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testSandboxID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.NoError(err)
}
func TestCreateSandboxFail(t *testing.T) {
if tc.NotValid(ktu.NeedRoot()) {
t.Skip(ktu.TestDisabledNeedRoot)
}
assert := assert.New(t)
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
err = writeOCIConfigFile(spec, ociConfigFile)
assert.NoError(err)
s := &service{
id: testSandboxID,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testSandboxID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.Error(err)
assert.True(vcmock.IsMockError(err))
}
func TestCreateSandboxConfigFail(t *testing.T) {
if tc.NotValid(ktu.NeedRoot()) {
t.Skip(ktu.TestDisabledNeedRoot)
}
assert := assert.New(t)
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
quota := int64(0)
limit := int64(0)
spec.Linux.Resources.Memory = &specs.LinuxMemory{
Limit: &limit,
}
// specify an invalid spec
spec.Linux.Resources.CPU = &specs.LinuxCPU{
Quota: &quota,
}
s := &service{
id: testSandboxID,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testSandboxID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.Error(err)
assert.True(vcmock.IsMockError(err))
}
func TestCreateContainerSuccess(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
CreateContainerFunc: func(containerConfig vc.ContainerConfig) (vc.VCContainer, error) {
return &vcmock.Container{}, nil
},
}
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
// set expected container type and sandboxID
spec.Annotations = make(map[string]string)
spec.Annotations[testContainerTypeAnnotation] = testContainerTypeContainer
spec.Annotations[testSandboxIDAnnotation] = testSandboxID
// rewrite file
err = writeOCIConfigFile(spec, ociConfigFile)
assert.NoError(err)
s := &service{
id: testContainerID,
sandbox: sandbox,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testContainerID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.NoError(err)
}
func TestCreateContainerFail(t *testing.T) {
assert := assert.New(t)
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
spec.Annotations = make(map[string]string)
spec.Annotations[testContainerTypeAnnotation] = testContainerTypeContainer
spec.Annotations[testSandboxIDAnnotation] = testSandboxID
err = writeOCIConfigFile(spec, ociConfigFile)
assert.NoError(err)
// doesn't create sandbox first
s := &service{
id: testContainerID,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testContainerID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.Error(err)
assert.False(vcmock.IsMockError(err))
}
func TestCreateContainerConfigFail(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.CreateContainerFunc = func(conf vc.ContainerConfig) (vc.VCContainer, error) {
return &vcmock.Container{}, nil
}
defer func() {
sandbox.CreateContainerFunc = nil
}()
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
runtimeConfig, err := newTestRuntimeConfig(tmpdir, testConsole, true)
assert.NoError(err)
bundlePath := filepath.Join(tmpdir, "bundle")
err = makeOCIBundle(bundlePath)
assert.NoError(err)
ociConfigFile := filepath.Join(bundlePath, "config.json")
assert.True(katautils.FileExists(ociConfigFile))
spec, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
// set the error containerType
spec.Annotations = make(map[string]string)
spec.Annotations[testContainerTypeAnnotation] = "errorType"
spec.Annotations[testSandboxIDAnnotation] = testSandboxID
err = writeOCIConfigFile(spec, ociConfigFile)
assert.NoError(err)
s := &service{
id: testContainerID,
sandbox: sandbox,
containers: make(map[string]*container),
config: &runtimeConfig,
ctx: context.Background(),
}
req := &taskAPI.CreateTaskRequest{
ID: testContainerID,
Bundle: bundlePath,
Terminal: true,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Create(ctx, req)
assert.Error(err)
}
func createAllRuntimeConfigFiles(dir, hypervisor string) (config string, err error) {
if dir == "" {
return "", fmt.Errorf("BUG: need directory")
}
if hypervisor == "" {
return "", fmt.Errorf("BUG: need hypervisor")
}
hypervisorPath := path.Join(dir, "hypervisor")
kernelPath := path.Join(dir, "kernel")
kernelParams := "foo=bar xyz"
imagePath := path.Join(dir, "image")
shimPath := path.Join(dir, "shim")
netmonPath := path.Join(dir, "netmon")
logDir := path.Join(dir, "logs")
logPath := path.Join(logDir, "runtime.log")
machineType := "machineType"
disableBlockDevice := true
blockDeviceDriver := "virtio-scsi"
enableIOThreads := true
hotplugVFIOOnRootBus := true
pcieRootPort := uint32(2)
disableNewNetNs := false
sharedFS := "virtio-9p"
virtioFSdaemon := path.Join(dir, "virtiofsd")
configFileOptions := ktu.RuntimeConfigOptions{
Hypervisor: "qemu",
HypervisorPath: hypervisorPath,
KernelPath: kernelPath,
ImagePath: imagePath,
KernelParams: kernelParams,
MachineType: machineType,
ShimPath: shimPath,
NetmonPath: netmonPath,
LogPath: logPath,
DisableBlock: disableBlockDevice,
BlockDeviceDriver: blockDeviceDriver,
EnableIOThreads: enableIOThreads,
HotplugVFIOOnRootBus: hotplugVFIOOnRootBus,
PCIeRootPort: pcieRootPort,
DisableNewNetNs: disableNewNetNs,
SharedFS: sharedFS,
VirtioFSDaemon: virtioFSdaemon,
}
runtimeConfigFileData := ktu.MakeRuntimeConfigFileData(configFileOptions)
configPath := path.Join(dir, "runtime.toml")
err = ioutil.WriteFile(configPath, []byte(runtimeConfigFileData), os.FileMode(0640))
if err != nil {
return "", err
}
files := []string{hypervisorPath, kernelPath, imagePath, shimPath}
for _, file := range files {
// create the resource (which must be >0 bytes)
err := ioutil.WriteFile(file, []byte("foo"), os.FileMode(0640))
if err != nil {
return "", err
}
}
return configPath, nil
}
func TestCreateLoadRuntimeConfig(t *testing.T) {
assert := assert.New(t)
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(tmpdir)
config, err := createAllRuntimeConfigFiles(tmpdir, "qemu")
assert.NoError(err)
s := &service{
id: testSandboxID,
ctx: context.Background(),
}
r := &taskAPI.CreateTaskRequest{}
anno := make(map[string]string)
// set all to fake path
fakeConfig := "foobar"
anno[vcAnnotations.SandboxConfigPathKey] = fakeConfig
option := &crioption.Options{ConfigPath: fakeConfig}
r.Options, err = typeurl.MarshalAny(option)
assert.NoError(err)
err = os.Setenv("KATA_CONF_FILE", fakeConfig)
assert.NoError(err)
defer os.Setenv("KATA_CONF_FILE", "")
// fake config should fail
_, err = loadRuntimeConfig(s, r, anno)
assert.Error(err)
// 1. podsandbox annotation
anno[vcAnnotations.SandboxConfigPathKey] = config
_, err = loadRuntimeConfig(s, r, anno)
assert.NoError(err)
anno[vcAnnotations.SandboxConfigPathKey] = ""
// 2. shimv2 create task option
option.ConfigPath = config
r.Options, err = typeurl.MarshalAny(option)
assert.NoError(err)
_, err = loadRuntimeConfig(s, r, anno)
assert.NoError(err)
option.ConfigPath = ""
r.Options, err = typeurl.MarshalAny(option)
assert.NoError(err)
// 3. environment
err = os.Setenv("KATA_CONF_FILE", config)
assert.NoError(err)
_, err = loadRuntimeConfig(s, r, anno)
assert.NoError(err)
}

View File

@@ -0,0 +1,47 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"path"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/mount"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
)
func deleteContainer(ctx context.Context, s *service, c *container) error {
if !c.cType.IsSandbox() {
if c.status != task.StatusStopped {
if _, err := s.sandbox.StopContainer(ctx, c.id, false); err != nil && !isNotFound(err) {
return err
}
}
if _, err := s.sandbox.DeleteContainer(ctx, c.id); err != nil && !isNotFound(err) {
return err
}
}
// Run post-stop OCI hooks.
if err := katautils.PostStopHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle); err != nil {
// log warning and continue, as defined in oci runtime spec
// https://github.com/opencontainers/runtime-spec/blob/master/runtime.md#lifecycle
shimLog.WithError(err).Warn("Failed to run post-stop hooks")
}
if c.mounted {
rootfs := path.Join(c.bundle, "rootfs")
if err := mount.UnmountAll(rootfs, 0); err != nil {
shimLog.WithError(err).Warn("failed to cleanup rootfs mount")
}
}
delete(s.containers, c.id)
return nil
}

View File

@@ -0,0 +1,61 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/stretchr/testify/assert"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/compatoci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
)
func TestDeleteContainerSuccessAndFail(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
rootPath, bundlePath := testConfigSetup(t)
defer os.RemoveAll(rootPath)
_, err := compatoci.ParseConfigJSON(bundlePath)
assert.NoError(err)
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testContainerID,
}
s.containers[testContainerID], err = newContainer(s, reqCreate, "", nil, true)
assert.NoError(err)
}
func testConfigSetup(t *testing.T) (rootPath string, bundlePath string) {
assert := assert.New(t)
tmpdir, err := ioutil.TempDir("", "")
assert.NoError(err)
bundlePath = filepath.Join(tmpdir, "bundle")
err = os.MkdirAll(bundlePath, testDirMode)
assert.NoError(err)
err = createOCIConfig(bundlePath)
assert.NoError(err)
return tmpdir, bundlePath
}

View File

@@ -0,0 +1,70 @@
// Copyright (c) 2019 hyper.sh
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"strings"
"syscall"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/types"
)
// toGRPC maps the virtcontainers error into a grpc error,
// using the original error message as a description.
func toGRPC(err error) error {
if err == nil {
return nil
}
if isGRPCError(err) {
// error has already been mapped to grpc
return err
}
err = errors.Cause(err)
switch {
case isInvalidArgument(err):
return status.Errorf(codes.InvalidArgument, err.Error())
case isNotFound(err):
return status.Errorf(codes.NotFound, err.Error())
}
return err
}
// toGRPCf maps the error to grpc error codes, assembling the formatting string
// and combining it with the target error string.
func toGRPCf(err error, format string, args ...interface{}) error {
return toGRPC(errors.Wrapf(err, format, args...))
}
func isGRPCError(err error) bool {
_, ok := status.FromError(err)
return ok
}
func isInvalidArgument(err error) bool {
return err == vc.ErrNeedSandbox || err == vc.ErrNeedSandboxID ||
err == vc.ErrNeedContainerID || err == vc.ErrNeedState ||
err == syscall.EINVAL
}
func isNotFound(err error) bool {
return err == vc.ErrNoSuchContainer || err == syscall.ENOENT ||
strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "not exist")
}
func isGRPCErrorCode(code codes.Code, err error) bool {
s, ok := status.FromError(err)
if !ok {
return false
}
return s != nil && s.Code() == code
}

View File

@@ -0,0 +1,40 @@
// Copyright (c) 2019 hyper.sh
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"errors"
"syscall"
"testing"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestToGRPC(t *testing.T) {
assert := assert.New(t)
for _, err := range []error{vc.ErrNeedSandbox, vc.ErrNeedSandboxID,
vc.ErrNeedContainerID, vc.ErrNeedState, syscall.EINVAL, vc.ErrNoSuchContainer, syscall.ENOENT} {
assert.False(isGRPCError(err))
err = toGRPC(err)
assert.True(isGRPCError(err))
err = toGRPC(err)
assert.True(isGRPCError(err))
err = toGRPCf(err, "appending")
assert.True(isGRPCError(err))
}
}
func TestIsGRPCErrorCode(t *testing.T) {
assert := assert.New(t)
assert.True(isGRPCErrorCode(codes.Unimplemented, status.New(codes.Unimplemented, "foobar").Err()))
assert.True(isGRPCErrorCode(codes.NotFound, status.New(codes.NotFound, "foobar").Err()))
assert.False(isGRPCErrorCode(codes.Unimplemented, errors.New("foobar")))
}

View File

@@ -0,0 +1,147 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"fmt"
"io"
"strings"
"time"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/typeurl"
googleProtobuf "github.com/gogo/protobuf/types"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
type exec struct {
container *container
cmds *types.Cmd
tty *tty
ttyio *ttyIO
stdinPipe io.WriteCloser
exitTime time.Time
exitIOch chan struct{}
stdinCloser chan struct{}
exitCh chan uint32
id string
exitCode int32
status task.Status
}
type tty struct {
stdin string
stdout string
stderr string
height uint32
width uint32
terminal bool
}
func getEnvs(envs []string) []types.EnvVar {
var vcEnvs = []types.EnvVar{}
var env types.EnvVar
for _, v := range envs {
pair := strings.SplitN(v, "=", 2)
if len(pair) == 2 {
env = types.EnvVar{Var: pair[0], Value: pair[1]}
} else if len(pair) == 1 {
env = types.EnvVar{Var: pair[0], Value: ""}
}
vcEnvs = append(vcEnvs, env)
}
return vcEnvs
}
func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *googleProtobuf.Any) (*exec, error) {
var height uint32
var width uint32
if jspec == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "googleProtobuf.Any points to nil")
}
// process exec request
var spec *specs.Process
v, err := typeurl.UnmarshalAny(jspec)
if err != nil {
return nil, err
}
spec, ok := v.(*specs.Process)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "Get an invalid spec type")
}
if spec.ConsoleSize != nil {
height = uint32(spec.ConsoleSize.Height)
width = uint32(spec.ConsoleSize.Width)
}
var extraGroups []string
for _, g := range spec.User.AdditionalGids {
extraGroups = append(extraGroups, fmt.Sprintf("%d", g))
}
tty := &tty{
stdin: stdin,
stdout: stdout,
stderr: stderr,
height: height,
width: width,
terminal: terminal,
}
cmds := &types.Cmd{
Args: spec.Args,
Envs: getEnvs(spec.Env),
User: fmt.Sprintf("%d", spec.User.UID),
PrimaryGroup: fmt.Sprintf("%d", spec.User.GID),
SupplementaryGroups: extraGroups,
WorkDir: spec.Cwd,
Interactive: terminal,
Detach: !terminal,
NoNewPrivileges: spec.NoNewPrivileges,
}
exec := &exec{
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
}
return exec, nil
}
func (c *container) getExec(id string) (*exec, error) {
if c.execs == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id)
}
exec := c.execs[id]
if exec == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "exec does not exist %s", id)
}
return exec, nil
}

View File

@@ -0,0 +1,50 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"testing"
"github.com/containerd/containerd/namespaces"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func TestExecNoSpecFail(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testContainerID,
}
var err error
s.containers[testContainerID], err = newContainer(s, reqCreate, "", nil, false)
assert.NoError(err)
reqExec := &taskAPI.ExecProcessRequest{
ID: testContainerID,
ExecID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Exec(ctx, reqExec)
assert.Error(err)
}

View File

@@ -0,0 +1,194 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
cgroupsv1 "github.com/containerd/cgroups/stats/v1"
"github.com/containerd/typeurl"
google_protobuf "github.com/gogo/protobuf/types"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
)
func marshalMetrics(ctx context.Context, s *service, containerID string) (*google_protobuf.Any, error) {
stats, err := s.sandbox.StatsContainer(ctx, containerID)
if err != nil {
return nil, err
}
metrics := statsToMetrics(&stats)
data, err := typeurl.MarshalAny(metrics)
if err != nil {
return nil, err
}
return data, nil
}
func statsToMetrics(stats *vc.ContainerStats) *cgroupsv1.Metrics {
metrics := &cgroupsv1.Metrics{}
if stats.CgroupStats != nil {
metrics = &cgroupsv1.Metrics{
Hugetlb: setHugetlbStats(stats.CgroupStats.HugetlbStats),
Pids: setPidsStats(stats.CgroupStats.PidsStats),
CPU: setCPUStats(stats.CgroupStats.CPUStats),
Memory: setMemoryStats(stats.CgroupStats.MemoryStats),
Blkio: setBlkioStats(stats.CgroupStats.BlkioStats),
}
}
metrics.Network = setNetworkStats(stats.NetworkStats)
return metrics
}
func setHugetlbStats(vcHugetlb map[string]vc.HugetlbStats) []*cgroupsv1.HugetlbStat {
var hugetlbStats []*cgroupsv1.HugetlbStat
for _, v := range vcHugetlb {
hugetlbStats = append(
hugetlbStats,
&cgroupsv1.HugetlbStat{
Usage: v.Usage,
Max: v.MaxUsage,
Failcnt: v.Failcnt,
})
}
return hugetlbStats
}
func setPidsStats(vcPids vc.PidsStats) *cgroupsv1.PidsStat {
pidsStats := &cgroupsv1.PidsStat{
Current: vcPids.Current,
Limit: vcPids.Limit,
}
return pidsStats
}
func setCPUStats(vcCPU vc.CPUStats) *cgroupsv1.CPUStat {
var perCPU []uint64
perCPU = append(perCPU, vcCPU.CPUUsage.PercpuUsage...)
cpuStats := &cgroupsv1.CPUStat{
Usage: &cgroupsv1.CPUUsage{
Total: vcCPU.CPUUsage.TotalUsage,
Kernel: vcCPU.CPUUsage.UsageInKernelmode,
User: vcCPU.CPUUsage.UsageInUsermode,
PerCPU: perCPU,
},
Throttling: &cgroupsv1.Throttle{
Periods: vcCPU.ThrottlingData.Periods,
ThrottledPeriods: vcCPU.ThrottlingData.ThrottledPeriods,
ThrottledTime: vcCPU.ThrottlingData.ThrottledTime,
},
}
return cpuStats
}
func setMemoryStats(vcMemory vc.MemoryStats) *cgroupsv1.MemoryStat {
memoryStats := &cgroupsv1.MemoryStat{
Usage: &cgroupsv1.MemoryEntry{
Limit: vcMemory.Usage.Limit,
Usage: vcMemory.Usage.Usage,
Max: vcMemory.Usage.MaxUsage,
Failcnt: vcMemory.Usage.Failcnt,
},
Swap: &cgroupsv1.MemoryEntry{
Limit: vcMemory.SwapUsage.Limit,
Usage: vcMemory.SwapUsage.Usage,
Max: vcMemory.SwapUsage.MaxUsage,
Failcnt: vcMemory.SwapUsage.Failcnt,
},
Kernel: &cgroupsv1.MemoryEntry{
Limit: vcMemory.KernelUsage.Limit,
Usage: vcMemory.KernelUsage.Usage,
Max: vcMemory.KernelUsage.MaxUsage,
Failcnt: vcMemory.KernelUsage.Failcnt,
},
KernelTCP: &cgroupsv1.MemoryEntry{
Limit: vcMemory.KernelTCPUsage.Limit,
Usage: vcMemory.KernelTCPUsage.Usage,
Max: vcMemory.KernelTCPUsage.MaxUsage,
Failcnt: vcMemory.KernelTCPUsage.Failcnt,
},
}
if vcMemory.UseHierarchy {
memoryStats.Cache = vcMemory.Stats["total_cache"]
memoryStats.RSS = vcMemory.Stats["total_rss"]
memoryStats.MappedFile = vcMemory.Stats["total_mapped_file"]
} else {
memoryStats.Cache = vcMemory.Stats["cache"]
memoryStats.RSS = vcMemory.Stats["rss"]
memoryStats.MappedFile = vcMemory.Stats["mapped_file"]
}
if v, ok := vcMemory.Stats["pgfault"]; ok {
memoryStats.PgFault = v
}
if v, ok := vcMemory.Stats["pgmajfault"]; ok {
memoryStats.PgMajFault = v
}
if v, ok := vcMemory.Stats["total_inactive_file"]; ok {
memoryStats.TotalInactiveFile = v
}
return memoryStats
}
func setBlkioStats(vcBlkio vc.BlkioStats) *cgroupsv1.BlkIOStat {
blkioStats := &cgroupsv1.BlkIOStat{
IoServiceBytesRecursive: copyBlkio(vcBlkio.IoServiceBytesRecursive),
IoServicedRecursive: copyBlkio(vcBlkio.IoServicedRecursive),
IoQueuedRecursive: copyBlkio(vcBlkio.IoQueuedRecursive),
SectorsRecursive: copyBlkio(vcBlkio.SectorsRecursive),
IoServiceTimeRecursive: copyBlkio(vcBlkio.IoServiceTimeRecursive),
IoWaitTimeRecursive: copyBlkio(vcBlkio.IoWaitTimeRecursive),
IoMergedRecursive: copyBlkio(vcBlkio.IoMergedRecursive),
IoTimeRecursive: copyBlkio(vcBlkio.IoTimeRecursive),
}
return blkioStats
}
func copyBlkio(s []vc.BlkioStatEntry) []*cgroupsv1.BlkIOEntry {
ret := make([]*cgroupsv1.BlkIOEntry, len(s))
for i, v := range s {
ret[i] = &cgroupsv1.BlkIOEntry{
Op: v.Op,
Major: v.Major,
Minor: v.Minor,
Value: v.Value,
}
}
return ret
}
func setNetworkStats(vcNetwork []*vc.NetworkStats) []*cgroupsv1.NetworkStat {
networkStats := make([]*cgroupsv1.NetworkStat, len(vcNetwork))
for i, v := range vcNetwork {
networkStats[i] = &cgroupsv1.NetworkStat{
Name: v.Name,
RxBytes: v.RxBytes,
RxPackets: v.RxPackets,
RxErrors: v.RxErrors,
RxDropped: v.RxDropped,
TxBytes: v.TxBytes,
TxPackets: v.TxPackets,
TxErrors: v.TxErrors,
TxDropped: v.TxDropped,
}
}
return networkStats
}

View File

@@ -0,0 +1,59 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"testing"
"github.com/containerd/cgroups/stats/v1"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func TestStatNetworkMetric(t *testing.T) {
assert := assert.New(t)
var err error
mockNetwork := []*vc.NetworkStats{
{
Name: "test-network",
RxBytes: 10,
TxBytes: 20,
},
}
expectedNetwork := []*v1.NetworkStat{
{
Name: "test-network",
RxBytes: 10,
TxBytes: 20,
},
}
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.StatsContainerFunc = func(contID string) (vc.ContainerStats, error) {
return vc.ContainerStats{
NetworkStats: mockNetwork,
}, nil
}
defer func() {
sandbox.StatsContainerFunc = nil
}()
resp, err := sandbox.StatsContainer(context.Background(), testContainerID)
assert.NoError(err)
metrics := statsToMetrics(&resp)
assert.Equal(expectedNetwork, metrics.Network)
}

View File

@@ -0,0 +1,203 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"testing"
"github.com/containerd/containerd/namespaces"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types"
"github.com/stretchr/testify/assert"
)
func TestPauseContainerSuccess(t *testing.T) {
assert := assert.New(t)
var err error
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.PauseContainerFunc = func(contID string) error {
return nil
}
defer func() {
sandbox.PauseContainerFunc = nil
}()
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: testContainerID,
Annotations: make(map[string]string),
State: types.ContainerState{
State: types.StateRunning,
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testContainerID,
}
s.containers[testContainerID], err = newContainer(s, reqCreate, "", nil, true)
assert.NoError(err)
reqPause := &taskAPI.PauseRequest{
ID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Pause(ctx, reqPause)
assert.NoError(err)
}
func TestPauseContainerFail(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.PauseContainerFunc = func(contID string) error {
return nil
}
defer func() {
sandbox.PauseContainerFunc = nil
}()
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: testContainerID,
Annotations: make(map[string]string),
State: types.ContainerState{
State: types.StateRunning,
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqPause := &taskAPI.PauseRequest{
ID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err := s.Pause(ctx, reqPause)
assert.Error(err)
}
func TestResumeContainerSuccess(t *testing.T) {
assert := assert.New(t)
var err error
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.ResumeContainerFunc = func(contID string) error {
return nil
}
defer func() {
sandbox.ResumeContainerFunc = nil
}()
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: testContainerID,
Annotations: make(map[string]string),
State: types.ContainerState{
State: types.StateRunning,
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testContainerID,
}
s.containers[testContainerID], err = newContainer(s, reqCreate, "", nil, true)
assert.NoError(err)
reqResume := &taskAPI.ResumeRequest{
ID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Resume(ctx, reqResume)
assert.NoError(err)
}
func TestResumeContainerFail(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.ResumeContainerFunc = func(contID string) error {
return nil
}
defer func() {
sandbox.ResumeContainerFunc = nil
}()
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: testContainerID,
Annotations: make(map[string]string),
State: types.ContainerState{
State: types.StateRunning,
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
reqResume := &taskAPI.ResumeRequest{
ID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err := s.Resume(ctx, reqResume)
assert.Error(err)
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
// Copyright (c) 2021 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
ktu "github.com/kata-containers/kata-containers/src/runtime/pkg/katatestutils"
"github.com/stretchr/testify/assert"
)
func newService(id string) (*service, error) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
s := &service{
id: id,
pid: uint32(os.Getpid()),
ctx: ctx,
containers: make(map[string]*container),
events: make(chan interface{}, chSize),
ec: make(chan exit, bufferSize),
cancel: cancel,
}
return s, nil
}
func TestServiceCreate(t *testing.T) {
const badCIDErrorPrefix = "invalid container/sandbox ID"
const blankCIDError = "ID cannot be blank"
assert := assert.New(t)
tmpdir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpdir)
bundleDir := filepath.Join(tmpdir, "bundle")
err := makeOCIBundle(bundleDir)
assert.NoError(err)
ctx := context.Background()
s, err := newService("foo")
assert.NoError(err)
for i, d := range ktu.ContainerIDTestData {
msg := fmt.Sprintf("test[%d]: %+v", i, d)
// Only consider error scenarios as we are only testing invalid CIDs here.
if d.Valid {
continue
}
task := taskAPI.CreateTaskRequest{
ID: d.ID,
Bundle: bundleDir,
}
_, err = s.Create(ctx, &task)
assert.Error(err, msg)
if d.ID == "" {
assert.Equal(err.Error(), blankCIDError, msg)
} else {
assert.True(strings.HasPrefix(err.Error(), badCIDErrorPrefix), msg)
}
}
}

View File

@@ -0,0 +1,190 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"expvar"
"fmt"
"io"
"net/http"
"net/http/pprof"
"path/filepath"
"strconv"
"strings"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
)
var (
ifSupportAgentMetricsAPI = true
shimMgtLog = shimLog.WithField("subsystem", "shim-management")
)
// agentURL returns URL for agent
func (s *service) agentURL(w http.ResponseWriter, r *http.Request) {
url, err := s.sandbox.GetAgentURL()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
fmt.Fprint(w, url)
}
// serveMetrics handle /metrics requests
func (s *service) serveMetrics(w http.ResponseWriter, r *http.Request) {
// update metrics from sandbox
s.sandbox.UpdateRuntimeMetrics()
// update metrics for shim process
updateShimMetrics()
// metrics gathered by shim
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return
}
// encode the metrics
encoder := expfmt.NewEncoder(w, expfmt.FmtText)
for _, mf := range mfs {
encoder.Encode(mf)
}
// if using an old agent, only collect shim/sandbox metrics.
if !ifSupportAgentMetricsAPI {
return
}
// get metrics from agent
// can not pass context to serveMetrics, so use background context
agentMetrics, err := s.sandbox.GetAgentMetrics(context.Background())
if err != nil {
shimMgtLog.WithError(err).Error("failed GetAgentMetrics")
if isGRPCErrorCode(codes.NotFound, err) {
shimMgtLog.Warn("metrics API not supportted by this agent.")
ifSupportAgentMetricsAPI = false
return
}
}
// decode and parse metrics from agent
list := decodeAgentMetrics(agentMetrics)
// encode the metrics to output
for _, mf := range list {
encoder.Encode(mf)
}
// collect pod overhead metrics need sleep to get the changes of cpu/memory resources usage
// so here only trigger the collect operation, and the data will be gathered
// next time collection request from Prometheus server
go s.setPodOverheadMetrics(context.Background())
}
func decodeAgentMetrics(body string) []*dto.MetricFamily {
// decode agent metrics
reader := strings.NewReader(body)
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)
list := make([]*dto.MetricFamily, 0)
for {
mf := &dto.MetricFamily{}
if err := decoder.Decode(mf); err != nil {
if err == io.EOF {
break
}
} else {
// metrics collected by prometheus(prefixed by go_ and process_ ) will to add a prefix to
// to avoid an naming conflicts
// this will only has effect for go version agent(Kata 1.x).
// And rust agent will create metrics for processes with the prefix "process_"
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
mf.Name = mutils.String2Pointer("kata_agent_" + *mf.Name)
}
list = append(list, mf)
}
}
return list
}
func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path
metricsAddress := SocketAddress(s.id)
listener, err := cdshim.NewSocket(metricsAddress)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create listener")
return
}
// write metrics address to filesystem
if err := cdshim.WriteAddress("monitor_address", metricsAddress); err != nil {
shimMgtLog.WithError(err).Errorf("failed to write metrics address")
return
}
shimMgtLog.Info("kata management inited")
// bind handler
m := http.NewServeMux()
m.Handle("/metrics", http.HandlerFunc(s.serveMetrics))
m.Handle("/agent-url", http.HandlerFunc(s.agentURL))
s.mountPprofHandle(m, ociSpec)
// register shim metrics
registerMetrics()
// register sandbox metrics
vc.RegisterMetrics()
// start serve
svr := &http.Server{Handler: m}
svr.Serve(listener)
}
// mountPprofHandle provides a debug endpoint
func (s *service) mountPprofHandle(m *http.ServeMux, ociSpec *specs.Spec) {
// return if not enabled
if !s.config.EnablePprof {
value, ok := ociSpec.Annotations[vcAnnotations.EnablePprof]
if !ok {
return
}
enabled, err := strconv.ParseBool(value)
if err != nil || !enabled {
return
}
}
m.Handle("/debug/vars", expvar.Handler())
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}
// SocketAddress returns the address of the abstract domain socket for communicating with the
// shim management endpoint
func SocketAddress(id string) string {
return fmt.Sprintf("unix://%s", filepath.Join(string(filepath.Separator), "run", "vc", "sbs", id, "shim-monitor.sock"))
}

View File

@@ -0,0 +1,63 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func TestServeMetrics(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
rr := httptest.NewRecorder()
r := &http.Request{}
// case 1: normal
sandbox.GetAgentMetricsFunc = func() (string, error) {
return `# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 23
`, nil
}
defer func() {
sandbox.GetAgentMetricsFunc = nil
}()
s.serveMetrics(rr, r)
assert.Equal(200, rr.Code, "response code should be 200")
body := rr.Body.String()
assert.Equal(true, strings.Contains(body, "kata_agent_go_threads 23\n"))
// case 2: GetAgentMetricsFunc return error
sandbox.GetAgentMetricsFunc = func() (string, error) {
return "", fmt.Errorf("some error occurred")
}
s.serveMetrics(rr, r)
assert.Equal(200, rr.Code, "response code should be 200")
body = rr.Body.String()
assert.Equal(true, len(strings.Split(body, "\n")) > 0)
}

View File

@@ -0,0 +1,211 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"time"
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/procfs"
)
const namespaceKatashim = "kata_shim"
var (
rpcDurationsHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespaceKatashim,
Name: "rpc_durations_histogram_milliseconds",
Help: "RPC latency distributions.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
},
[]string{"action"},
)
katashimThreads = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "threads",
Help: "Kata containerd shim v2 process threads.",
})
katashimProcStatus = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "proc_status",
Help: "Kata containerd shim v2 process status.",
},
[]string{"item"},
)
katashimProcStat = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "proc_stat",
Help: "Kata containerd shim v2 process statistics.",
},
[]string{"item"},
)
katashimNetdev = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "netdev",
Help: "Kata containerd shim v2 network devices statistics.",
},
[]string{"interface", "item"},
)
katashimIOStat = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "io_stat",
Help: "Kata containerd shim v2 process IO statistics.",
},
[]string{"item"},
)
katashimOpenFDs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "fds",
Help: "Kata containerd shim v2 open FDs.",
})
katashimPodOverheadCPU = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "pod_overhead_cpu",
Help: "Kata Pod overhead for CPU resources(percent).",
})
katashimPodOverheadMemory = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceKatashim,
Name: "pod_overhead_memory_in_bytes",
Help: "Kata Pod overhead for memory resources(bytes).",
})
)
func registerMetrics() {
prometheus.MustRegister(rpcDurationsHistogram)
prometheus.MustRegister(katashimThreads)
prometheus.MustRegister(katashimProcStatus)
prometheus.MustRegister(katashimProcStat)
prometheus.MustRegister(katashimNetdev)
prometheus.MustRegister(katashimIOStat)
prometheus.MustRegister(katashimOpenFDs)
prometheus.MustRegister(katashimPodOverheadCPU)
prometheus.MustRegister(katashimPodOverheadMemory)
}
// updateShimMetrics will update metrics for kata shim process itself
func updateShimMetrics() error {
proc, err := procfs.Self()
if err != nil {
return err
}
// metrics about open FDs
if fds, err := proc.FileDescriptorsLen(); err == nil {
katashimOpenFDs.Set(float64(fds))
}
// network device metrics
if netdev, err := proc.NetDev(); err == nil {
// netdev: map[string]NetDevLine
for _, v := range netdev {
mutils.SetGaugeVecNetDev(katashimNetdev, v)
}
}
// proc stat
if procStat, err := proc.Stat(); err == nil {
katashimThreads.Set(float64(procStat.NumThreads))
mutils.SetGaugeVecProcStat(katashimProcStat, procStat)
}
// proc status
if procStatus, err := proc.NewStatus(); err == nil {
mutils.SetGaugeVecProcStatus(katashimProcStatus, procStatus)
}
// porc IO stat
if ioStat, err := proc.IO(); err == nil {
mutils.SetGaugeVecProcIO(katashimIOStat, ioStat)
}
return nil
}
// statsSandbox returns a detailed sandbox stats.
func (s *service) statsSandbox(ctx context.Context) (vc.SandboxStats, []vc.ContainerStats, error) {
sandboxStats, err := s.sandbox.Stats(ctx)
if err != nil {
return vc.SandboxStats{}, []vc.ContainerStats{}, err
}
containerStats := []vc.ContainerStats{}
for _, c := range s.sandbox.GetAllContainers() {
cstats, err := s.sandbox.StatsContainer(ctx, c.ID())
if err != nil {
return vc.SandboxStats{}, []vc.ContainerStats{}, err
}
containerStats = append(containerStats, cstats)
}
return sandboxStats, containerStats, nil
}
func calcOverhead(initialSandboxStats, finishSandboxStats vc.SandboxStats, initialContainerStats, finishContainersStats []vc.ContainerStats, deltaTime float64) (float64, float64) {
hostInitCPU := initialSandboxStats.CgroupStats.CPUStats.CPUUsage.TotalUsage
guestInitCPU := uint64(0)
for _, cs := range initialContainerStats {
guestInitCPU += cs.CgroupStats.CPUStats.CPUUsage.TotalUsage
}
hostFinalCPU := finishSandboxStats.CgroupStats.CPUStats.CPUUsage.TotalUsage
guestFinalCPU := uint64(0)
for _, cs := range finishContainersStats {
guestFinalCPU += cs.CgroupStats.CPUStats.CPUUsage.TotalUsage
}
var guestMemoryUsage uint64
for _, cs := range finishContainersStats {
guestMemoryUsage += cs.CgroupStats.MemoryStats.Usage.Usage
}
hostMemoryUsage := finishSandboxStats.CgroupStats.MemoryStats.Usage.Usage
cpuUsageGuest := float64(guestFinalCPU-guestInitCPU) / deltaTime * 100
cpuUsageHost := float64(hostFinalCPU-hostInitCPU) / deltaTime * 100
return float64(hostMemoryUsage - guestMemoryUsage), cpuUsageHost - cpuUsageGuest
}
func (s *service) getPodOverhead(ctx context.Context) (float64, float64, error) {
initTime := time.Now().UnixNano()
initialSandboxStats, initialContainerStats, err := s.statsSandbox(ctx)
if err != nil {
return 0, 0, err
}
// Wait for 1 second to calculate CPU usage
time.Sleep(time.Second * 1)
finishtTime := time.Now().UnixNano()
deltaTime := float64(finishtTime - initTime)
finishSandboxStats, finishContainersStats, err := s.statsSandbox(ctx)
if err != nil {
return 0, 0, err
}
mem, cpu := calcOverhead(initialSandboxStats, finishSandboxStats, initialContainerStats, finishContainersStats, deltaTime)
return mem, cpu, nil
}
func (s *service) setPodOverheadMetrics(ctx context.Context) error {
mem, cpu, err := s.getPodOverhead(ctx)
if err != nil {
return err
}
katashimPodOverheadMemory.Set(mem)
katashimPodOverheadCPU.Set(cpu)
return nil
}

View File

@@ -0,0 +1,115 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"testing"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func getSandboxCPUFunc(c, m uint64) func() (vc.SandboxStats, error) {
return func() (vc.SandboxStats, error) {
return vc.SandboxStats{
CgroupStats: vc.CgroupStats{
CPUStats: vc.CPUStats{
CPUUsage: vc.CPUUsage{
TotalUsage: c * 1e9,
},
},
MemoryStats: vc.MemoryStats{
Usage: vc.MemoryData{
Usage: m,
},
},
},
Cpus: 20,
}, nil
}
}
func getStatsContainerCPUFunc(fooCPU, barCPU, fooMem, barMem uint64) func(contID string) (vc.ContainerStats, error) {
return func(contID string) (vc.ContainerStats, error) {
vCPU := fooCPU
vMem := fooMem
if contID == "bar" {
vCPU = barCPU
vMem = barMem
}
return vc.ContainerStats{
CgroupStats: &vc.CgroupStats{
CPUStats: vc.CPUStats{
CPUUsage: vc.CPUUsage{
TotalUsage: vCPU * 1e9,
},
},
MemoryStats: vc.MemoryStats{
Usage: vc.MemoryData{
Usage: vMem,
},
},
},
}, nil
}
}
func TestStatsSandbox(t *testing.T) {
assert := assert.New(t)
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
StatsFunc: getSandboxCPUFunc(1000, 100000),
StatsContainerFunc: getStatsContainerCPUFunc(100, 200, 10000, 20000),
MockContainers: []*vcmock.Container{
{
MockID: "foo",
},
{
MockID: "bar",
},
},
}
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
}
initialSandboxStats, initialContainerStats, err := s.statsSandbox(context.Background())
assert.Nil(err)
assert.Equal(uint64(1000*1e9), initialSandboxStats.CgroupStats.CPUStats.CPUUsage.TotalUsage)
assert.Equal(2, len(initialContainerStats))
assert.Equal(uint64(100*1e9), initialContainerStats[0].CgroupStats.CPUStats.CPUUsage.TotalUsage)
assert.Equal(uint64(200*1e9), initialContainerStats[1].CgroupStats.CPUStats.CPUUsage.TotalUsage)
assert.Equal(uint64(10000), initialContainerStats[0].CgroupStats.MemoryStats.Usage.Usage)
assert.Equal(uint64(20000), initialContainerStats[1].CgroupStats.MemoryStats.Usage.Usage)
// get the 2nd stats
sandbox.StatsFunc = getSandboxCPUFunc(2000, 110000)
sandbox.StatsContainerFunc = getStatsContainerCPUFunc(200, 400, 20000, 40000)
finishSandboxStats, finishContainersStats, _ := s.statsSandbox(context.Background())
// calc overhead
mem, cpu := calcOverhead(initialSandboxStats, finishSandboxStats, initialContainerStats, finishContainersStats, 1e9)
// 70000 = (host2.cpu - host1.cpu - (delta containers.1.cpu + delta containers.2.cpu)) * 100
// = (2000 - 1000 - (200 -100 + 400 - 200)) * 100
// = (1000 - 300) * 100
// = 70000
assert.Equal(float64(70000), cpu)
// 50000 = 110000 - sum(containers)
// = 110000 - (20000 + 40000)
// = 50000
assert.Equal(float64(50000), mem)
}

View File

@@ -0,0 +1,148 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"github.com/containerd/containerd/api/types/task"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
)
func startContainer(ctx context.Context, s *service, c *container) (retErr error) {
defer func() {
if retErr != nil {
// notify the wait goroutine to continue
c.exitCh <- exitCode255
}
}()
// start a container
if c.cType == "" {
err := fmt.Errorf("Bug, the container %s type is empty", c.id)
return err
}
if s.sandbox == nil {
err := fmt.Errorf("Bug, the sandbox hasn't been created for this container %s", c.id)
return err
}
if c.cType.IsSandbox() {
err := s.sandbox.Start(ctx)
if err != nil {
return err
}
// Start monitor after starting sandbox
s.monitor, err = s.sandbox.Monitor(ctx)
if err != nil {
return err
}
go watchSandbox(ctx, s)
// We use s.ctx(`ctx` derived from `s.ctx`) to check for cancellation of the
// shim context and the context passed to startContainer for tracing.
go watchOOMEvents(ctx, s)
} else {
_, err := s.sandbox.StartContainer(ctx, c.id)
if err != nil {
return err
}
}
// Run post-start OCI hooks.
err := katautils.EnterNetNS(s.sandbox.GetNetNs(), func() error {
return katautils.PostStartHooks(ctx, *c.spec, s.sandbox.ID(), c.bundle)
})
if err != nil {
// log warning and continue, as defined in oci runtime spec
// https://github.com/opencontainers/runtime-spec/blob/master/runtime.md#lifecycle
shimLog.WithError(err).Warn("Failed to run post-start hooks")
}
c.status = task.StatusRunning
stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, c.id)
if err != nil {
return err
}
c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
// close the io exit channel, since there is no io for this container,
// otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
// close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
}
go wait(ctx, s, c, "")
return nil
}
func startExec(ctx context.Context, s *service, containerID, execID string) (e *exec, retErr error) {
// start an exec
c, err := s.getContainer(containerID)
if err != nil {
return nil, err
}
execs, err := c.getExec(execID)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
// notify the wait goroutine to continue
execs.exitCh <- exitCode255
}
}()
_, proc, err := s.sandbox.EnterContainer(ctx, containerID, *execs.cmds)
if err != nil {
err := fmt.Errorf("cannot enter container %s, with err %s", containerID, err)
return nil, err
}
execs.id = proc.Token
execs.status = task.StatusRunning
if execs.tty.height != 0 && execs.tty.width != 0 {
err = s.sandbox.WinsizeProcess(ctx, c.id, execs.id, execs.tty.height, execs.tty.width)
if err != nil {
return nil, err
}
}
stdin, stdout, stderr, err := s.sandbox.IOStream(c.id, execs.id)
if err != nil {
return nil, err
}
execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}
execs.ttyio = tty
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go wait(ctx, s, c, execID)
return execs, nil
}

View File

@@ -0,0 +1,179 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"testing"
"github.com/containerd/containerd/namespaces"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
"github.com/stretchr/testify/assert"
)
func TestStartStartSandboxSuccess(t *testing.T) {
assert := assert.New(t)
var err error
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: sandbox.ID(),
Annotations: map[string]string{
vcAnnotations.ContainerTypeKey: string(vc.PodSandbox),
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
ctx: namespaces.WithNamespace(context.Background(), "UnitTest"),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testSandboxID,
}
s.containers[testSandboxID], err = newContainer(s, reqCreate, vc.PodSandbox, nil, false)
assert.NoError(err)
reqStart := &taskAPI.StartRequest{
ID: testSandboxID,
}
sandbox.StartFunc = func() error {
return nil
}
defer func() {
sandbox.StartFunc = nil
}()
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Start(ctx, reqStart)
assert.NoError(err)
}
func TestStartMissingAnnotation(t *testing.T) {
assert := assert.New(t)
var err error
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: sandbox.ID(),
Annotations: map[string]string{},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
ctx: namespaces.WithNamespace(context.Background(), "UnitTest"),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testSandboxID,
}
s.containers[testSandboxID], err = newContainer(s, reqCreate, "", nil, false)
assert.NoError(err)
reqStart := &taskAPI.StartRequest{
ID: testSandboxID,
}
sandbox.StartFunc = func() error {
return nil
}
defer func() {
sandbox.StartFunc = nil
}()
_, err = s.Start(s.ctx, reqStart)
assert.Error(err)
assert.False(vcmock.IsMockError(err))
}
func TestStartStartContainerSucess(t *testing.T) {
assert := assert.New(t)
var err error
sandbox := &vcmock.Sandbox{
MockID: testSandboxID,
}
sandbox.MockContainers = []*vcmock.Container{
{
MockID: testContainerID,
MockSandbox: sandbox,
},
}
sandbox.StatusContainerFunc = func(contID string) (vc.ContainerStatus, error) {
return vc.ContainerStatus{
ID: testContainerID,
Annotations: map[string]string{
vcAnnotations.ContainerTypeKey: string(vc.PodContainer),
},
}, nil
}
defer func() {
sandbox.StatusContainerFunc = nil
}()
sandbox.StartContainerFunc = func(contID string) (vc.VCContainer, error) {
return sandbox.MockContainers[0], nil
}
defer func() {
sandbox.StartContainerFunc = nil
}()
s := &service{
id: testSandboxID,
sandbox: sandbox,
containers: make(map[string]*container),
ctx: namespaces.WithNamespace(context.Background(), "UnitTest"),
}
reqCreate := &taskAPI.CreateTaskRequest{
ID: testContainerID,
}
s.containers[testContainerID], err = newContainer(s, reqCreate, vc.PodContainer, nil, false)
assert.NoError(err)
reqStart := &taskAPI.StartRequest{
ID: testContainerID,
}
ctx := namespaces.WithNamespace(context.Background(), "UnitTest")
_, err = s.Start(ctx, reqStart)
assert.NoError(err)
}

View File

@@ -0,0 +1,132 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"io"
"sync"
"syscall"
"github.com/containerd/fifo"
)
// The buffer size used to specify the buffer for IO streams copy
const bufSize = 32 << 10
var (
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, bufSize)
return &buffer
},
}
)
type ttyIO struct {
Stdin io.ReadCloser
Stdout io.Writer
Stderr io.Writer
}
func (tty *ttyIO) close() {
if tty.Stdin != nil {
tty.Stdin.Close()
}
cf := func(w io.Writer) {
if w == nil {
return
}
if c, ok := w.(io.WriteCloser); ok {
c.Close()
}
}
cf(tty.Stdout)
cf(tty.Stderr)
}
func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (*ttyIO, error) {
var in io.ReadCloser
var outw io.Writer
var errw io.Writer
var err error
if stdin != "" {
in, err = fifo.OpenFifo(ctx, stdin, syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return nil, err
}
}
if stdout != "" {
outw, err = fifo.OpenFifo(ctx, stdout, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
if !console && stderr != "" {
errw, err = fifo.OpenFifo(ctx, stderr, syscall.O_RDWR, 0)
if err != nil {
return nil, err
}
}
ttyIO := &ttyIO{
Stdin: in,
Stdout: outw,
Stderr: errw,
}
return ttyIO, nil
}
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
if tty.Stdin != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
}()
}
if tty.Stdout != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stdout, stdoutPipe, *p)
wg.Done()
if tty.Stdin != nil {
// close stdin to make the other routine stop
tty.Stdin.Close()
tty.Stdin = nil
}
}()
}
if tty.Stderr != nil && stderrPipe != nil {
wg.Add(1)
go func() {
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(tty.Stderr, stderrPipe, *p)
wg.Done()
}()
}
wg.Wait()
tty.close()
close(exitch)
}

View File

@@ -0,0 +1,279 @@
// Copyright (c) 2020 Baidu Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"io"
"io/ioutil"
"path/filepath"
"syscall"
"testing"
"time"
"github.com/containerd/fifo"
"github.com/stretchr/testify/assert"
)
func TestNewTtyIOFifoReopen(t *testing.T) {
var outr io.ReadWriteCloser
var errr io.ReadWriteCloser
var tty *ttyIO
assert := assert.New(t)
ctx := context.TODO()
fifoPath, err := ioutil.TempDir(testDir, "fifo-path-")
assert.NoError(err)
stdout := filepath.Join(fifoPath, "stdout")
stderr := filepath.Join(fifoPath, "stderr")
createReadFifo := func(f string) io.ReadWriteCloser {
rf, err := fifo.OpenFifo(ctx, f, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
t.Fatal(err)
}
return rf
}
outr = createReadFifo(stdout)
defer outr.Close()
errr = createReadFifo(stderr)
defer errr.Close()
tty, err = newTtyIO(ctx, "", stdout, stderr, false)
assert.NoError(err)
defer tty.close()
testBytes := []byte("T")
checkFifoWrite := func(w io.Writer) {
_, err = w.Write(testBytes)
assert.NoError(err)
}
checkFifoRead := func(r io.Reader) {
var err error
buf := make([]byte, 1)
done := make(chan struct{})
timer := time.NewTimer(2 * time.Second)
go func() {
_, err = r.Read(buf)
close(done)
}()
select {
case <-done:
assert.NoError(err)
assert.Equal(buf, testBytes)
case <-timer.C:
t.Fatal("read fifo timeout")
}
}
checkFifoWrite(tty.Stdout)
checkFifoRead(outr)
checkFifoWrite(tty.Stderr)
checkFifoRead(errr)
err = outr.Close()
assert.NoError(err)
err = errr.Close()
assert.NoError(err)
// Make sure that writing to tty fifo will not get `EPIPE`
// when the read side is closed
checkFifoWrite(tty.Stdout)
checkFifoWrite(tty.Stderr)
// Reopen the fifo
outr = createReadFifo(stdout)
errr = createReadFifo(stderr)
checkFifoRead(outr)
checkFifoRead(errr)
}
func TestIoCopy(t *testing.T) {
t.Skip("TestIoCopy is failing randonly, see https://github.com/kata-containers/kata-containers/issues/2042")
assert := assert.New(t)
ctx := context.TODO()
testBytes1 := []byte("Test1")
testBytes2 := []byte("Test2")
testBytes3 := []byte("Test3")
fifoPath, err := ioutil.TempDir(testDir, "fifo-path-")
assert.NoError(err)
dstStdoutPath := filepath.Join(fifoPath, "dststdout")
dstStderrPath := filepath.Join(fifoPath, "dststderr")
// test function: create pipes, and use ioCopy() to copy data from one set to the other
// this function will be called multiple times, testing different combinations of closing order
// in order to verify that closing a pipe doesn't break the copy for the others
ioCopyTest := func(first, second, third string) {
var srcStdinPath string
if third != "" {
srcStdinPath = filepath.Join(fifoPath, "srcstdin")
}
logErrorMsg := func(msg string) string {
return "Error found while using order [" + first + " " + second + " " + third + "] - " + msg
}
exitioch := make(chan struct{})
stdinCloser := make(chan struct{})
createFifo := func(f string) (io.ReadCloser, io.WriteCloser) {
reader, err := fifo.OpenFifo(ctx, f, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
t.Fatal(err)
}
writer, err := fifo.OpenFifo(ctx, f, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
reader.Close()
t.Fatal(err)
}
return reader, writer
}
// create two sets of stdin, stdout and stderr pipes, to copy data from one to the other
srcOutR, srcOutW := createFifo(filepath.Join(fifoPath, "srcstdout"))
defer srcOutR.Close()
defer srcOutW.Close()
srcErrR, srcErrW := createFifo(filepath.Join(fifoPath, "srcstderr"))
defer srcErrR.Close()
defer srcErrW.Close()
dstInR, dstInW := createFifo(filepath.Join(fifoPath, "dststdin"))
defer dstInR.Close()
defer dstInW.Close()
dstOutR, err := fifo.OpenFifo(ctx, dstStdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
t.Fatal(err)
}
defer dstOutR.Close()
dstErrR, err := fifo.OpenFifo(ctx, dstStderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
t.Fatal(err)
}
defer dstErrR.Close()
var srcInW io.WriteCloser
if srcStdinPath != "" {
srcInW, err = fifo.OpenFifo(ctx, srcStdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
t.Fatal(err)
}
defer srcInW.Close()
}
tty, err := newTtyIO(ctx, srcStdinPath, dstStdoutPath, dstStderrPath, false)
assert.NoError(err)
defer tty.close()
// start the ioCopy threads : copy from src to dst
go ioCopy(exitioch, stdinCloser, tty, dstInW, srcOutR, srcErrR)
var firstW, secondW, thirdW io.WriteCloser
var firstR, secondR, thirdR io.Reader
getPipes := func(order string) (io.Reader, io.WriteCloser) {
switch order {
case "out":
return dstOutR, srcOutW
case "err":
return dstErrR, srcErrW
case "in":
return dstInR, srcInW
case "":
return nil, nil
}
t.Fatal("internal error")
return nil, nil
}
firstR, firstW = getPipes(first)
secondR, secondW = getPipes(second)
thirdR, thirdW = getPipes(third)
checkFifoWrite := func(w io.Writer, b []byte, name string) {
_, err := w.Write(b)
if name == "in" && (name == third || name == second && first == "out") {
// this is expected: when stdout is closed, ioCopy() will close stdin
// so if "in" is after "out", we will get here
} else {
assert.NoError(err, logErrorMsg("Write error on std"+name))
}
}
checkFifoRead := func(r io.Reader, b []byte, name string) {
var err error
buf := make([]byte, 5)
done := make(chan struct{})
timer := time.NewTimer(2 * time.Second)
go func() {
_, err = r.Read(buf)
close(done)
}()
select {
case <-done:
assert.NoError(err, logErrorMsg("Error reading from std"+name))
assert.Equal(b, buf, logErrorMsg("Value mismatch on std"+name))
case <-timer.C:
//t.Fatal(logErrorMsg("read fifo timeout on std" + name))
if name == "in" && (name == third || name == second && first == "out") {
// this is expected: when stdout is closed, ioCopy() will close stdin
// so if "in" is after "out", we will get here
} else {
assert.Fail(logErrorMsg("read fifo timeout on std" + name))
}
return
}
}
// write to each pipe, and close them immediately
// the ioCopy function should copy the data, then stop the corresponding thread
checkFifoWrite(firstW, testBytes1, first)
firstW.Close()
// need to make sure the Close() above is done before we continue
time.Sleep(time.Second)
checkFifoWrite(secondW, testBytes2, second)
secondW.Close()
if thirdW != nil {
// need to make sure the Close() above is done before we continue
time.Sleep(time.Second)
checkFifoWrite(thirdW, testBytes3, third)
thirdW.Close()
}
// wait for the end of the ioCopy
timer := time.NewTimer(2 * time.Second)
select {
case <-exitioch:
// now check that all data has been copied properly
checkFifoRead(firstR, testBytes1, first)
checkFifoRead(secondR, testBytes2, second)
if thirdR != nil {
checkFifoRead(thirdR, testBytes3, third)
}
case <-timer.C:
t.Fatal(logErrorMsg("timeout waiting for ioCopy()"))
}
}
// try the different combinations
// tests without stdin
ioCopyTest("out", "err", "")
ioCopyTest("err", "out", "")
// tests with stdin
ioCopyTest("out", "err", "in")
ioCopyTest("out", "in", "err")
ioCopyTest("err", "out", "in")
ioCopyTest("err", "in", "out")
ioCopyTest("in", "out", "err")
ioCopyTest("in", "err", "out")
}

View File

@@ -0,0 +1,124 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/containerd/containerd/mount"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/compatoci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
)
func cReap(s *service, status int, id, execid string, exitat time.Time) {
s.ec <- exit{
timestamp: exitat,
pid: s.hpid,
status: status,
id: id,
execid: execid,
}
}
func cleanupContainer(ctx context.Context, sandboxID, cid, bundlePath string) error {
shimLog.WithField("service", "cleanup").WithField("container", cid).Info("Cleanup container")
err := vci.CleanupContainer(ctx, sandboxID, cid, true)
if err != nil {
shimLog.WithError(err).WithField("container", cid).Warn("failed to cleanup container")
return err
}
rootfs := filepath.Join(bundlePath, "rootfs")
if err := mount.UnmountAll(rootfs, 0); err != nil {
shimLog.WithError(err).WithField("container", cid).Warn("failed to cleanup container rootfs")
return err
}
return nil
}
func validBundle(containerID, bundlePath string) (string, error) {
// container ID MUST be provided.
if containerID == "" {
return "", fmt.Errorf("Missing container ID")
}
// bundle path MUST be provided.
if bundlePath == "" {
return "", fmt.Errorf("Missing bundle path")
}
// bundle path MUST be valid.
fileInfo, err := os.Stat(bundlePath)
if err != nil {
return "", fmt.Errorf("Invalid bundle path '%s': %s", bundlePath, err)
}
if !fileInfo.IsDir() {
return "", fmt.Errorf("Invalid bundle path '%s', it should be a directory", bundlePath)
}
resolved, err := katautils.ResolvePath(bundlePath)
if err != nil {
return "", err
}
return resolved, nil
}
func getAddress(ctx context.Context, bundlePath, address, id string) (string, error) {
var err error
// Checks the MUST and MUST NOT from OCI runtime specification
if bundlePath, err = validBundle(id, bundlePath); err != nil {
return "", err
}
ociSpec, err := compatoci.ParseConfigJSON(bundlePath)
if err != nil {
return "", err
}
containerType, err := oci.ContainerType(ociSpec)
if err != nil {
return "", err
}
if containerType == vc.PodContainer {
sandboxID, err := oci.SandboxID(ociSpec)
if err != nil {
return "", err
}
address, err := cdshim.SocketAddress(ctx, address, sandboxID)
if err != nil {
return "", err
}
return address, nil
}
return "", nil
}
func noNeedForOutput(detach bool, tty bool) bool {
if !detach {
return false
}
if !tty {
return false
}
return true
}

View File

@@ -0,0 +1,328 @@
// Copyright (c) 2017 Intel Corporation
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
sysExec "os/exec"
"path"
"path/filepath"
"strings"
"github.com/opencontainers/runtime-spec/specs-go"
ktu "github.com/kata-containers/kata-containers/src/runtime/pkg/katatestutils"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils"
"github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/compatoci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/vcmock"
)
const (
// specConf is the name of the file holding the containers configuration
specConf = "config.json"
TestID = "container_test"
testDirMode = os.FileMode(0750)
testFileMode = os.FileMode(0640)
// testExeFileMode = os.FileMode(0750)
// small docker image used to create root filesystems from
testDockerImage = "busybox"
testSandboxID = "777-77-77777777"
testContainerID = "42"
testBundle = "bundle"
testConsole = "/dev/pts/888"
testContainerTypeAnnotation = "io.kubernetes.cri.container-type"
testSandboxIDAnnotation = "io.kubernetes.cri.sandbox-id"
testContainerTypeSandbox = "sandbox"
testContainerTypeContainer = "container"
)
var (
// package variables set by calling TestMain()
testDir = ""
testBundleDir = ""
tc ktu.TestConstraint
ctrEngine = katautils.CtrEngine{}
)
// testingImpl is a concrete mock RVC implementation used for testing
var testingImpl = &vcmock.VCMock{}
func init() {
fmt.Printf("INFO: running as actual user %v (effective %v), actual group %v (effective %v)\n",
os.Getuid(), os.Geteuid(), os.Getgid(), os.Getegid())
fmt.Printf("INFO: switching to fake virtcontainers implementation for testing\n")
vci = testingImpl
var err error
fmt.Printf("INFO: creating test directory\n")
testDir, err = ioutil.TempDir("", "shimV2-")
if err != nil {
panic(fmt.Sprintf("ERROR: failed to create test directory: %v", err))
}
fmt.Printf("INFO: test directory is %v\n", testDir)
var output string
for _, name := range katautils.DockerLikeCtrEngines {
fmt.Printf("INFO: checking for container engine: %s\n", name)
output, err = ctrEngine.Init(name)
if err == nil {
break
}
}
if ctrEngine.Name == "" {
panic(fmt.Sprintf("ERROR: Docker-like container engine not accessible to current user: %v (error %v)",
output, err))
}
// Do this now to avoid hitting the test timeout value due to
// slow network response.
fmt.Printf("INFO: ensuring required container image (%v) is available\n", testDockerImage)
// Only hit the network if the image doesn't exist locally
_, err = ctrEngine.Inspect(testDockerImage)
if err == nil {
fmt.Printf("INFO: container image %v already exists locally\n", testDockerImage)
} else {
fmt.Printf("INFO: pulling container image %v\n", testDockerImage)
_, err = ctrEngine.Pull(testDockerImage)
if err != nil {
panic(err)
}
}
testBundleDir = filepath.Join(testDir, testBundle)
err = os.MkdirAll(testBundleDir, testDirMode)
if err != nil {
panic(fmt.Sprintf("ERROR: failed to create bundle directory %v: %v", testBundleDir, err))
}
fmt.Printf("INFO: creating OCI bundle in %v for tests to use\n", testBundleDir)
err = realMakeOCIBundle(testBundleDir)
if err != nil {
panic(fmt.Sprintf("ERROR: failed to create OCI bundle: %v", err))
}
tc = ktu.NewTestConstraint(false)
}
// createOCIConfig creates an OCI configuration (spec) file in
// the bundle directory specified (which must exist).
func createOCIConfig(bundleDir string) error {
if bundleDir == "" {
return errors.New("BUG: Need bundle directory")
}
if !katautils.FileExists(bundleDir) {
return fmt.Errorf("BUG: Bundle directory %s does not exist", bundleDir)
}
var configCmd string
// Search for a suitable version of runc to use to generate
// the OCI config file.
for _, cmd := range []string{"docker-runc", "runc"} {
fullPath, err := sysExec.LookPath(cmd)
if err == nil {
configCmd = fullPath
break
}
}
if configCmd == "" {
return errors.New("Cannot find command to generate OCI config file")
}
_, err := utils.RunCommand([]string{configCmd, "spec", "--bundle", bundleDir})
if err != nil {
return err
}
specFile := filepath.Join(bundleDir, specConf)
if !katautils.FileExists(specFile) {
return fmt.Errorf("generated OCI config file does not exist: %v", specFile)
}
return nil
}
func createEmptyFile(path string) (err error) {
return ioutil.WriteFile(path, []byte(""), testFileMode)
}
// newTestHypervisorConfig creaets a new virtcontainers
// HypervisorConfig, ensuring that the required resources are also
// created.
//
// Note: no parameter validation in case caller wishes to create an invalid
// object.
func newTestHypervisorConfig(dir string, create bool) (vc.HypervisorConfig, error) {
kernelPath := path.Join(dir, "kernel")
imagePath := path.Join(dir, "image")
hypervisorPath := path.Join(dir, "hypervisor")
if create {
for _, file := range []string{kernelPath, imagePath, hypervisorPath} {
err := createEmptyFile(file)
if err != nil {
return vc.HypervisorConfig{}, err
}
}
}
return vc.HypervisorConfig{
KernelPath: kernelPath,
ImagePath: imagePath,
HypervisorPath: hypervisorPath,
HypervisorMachineType: "q35",
}, nil
}
// newTestRuntimeConfig creates a new RuntimeConfig
func newTestRuntimeConfig(dir, consolePath string, create bool) (oci.RuntimeConfig, error) {
if dir == "" {
return oci.RuntimeConfig{}, errors.New("BUG: need directory")
}
hypervisorConfig, err := newTestHypervisorConfig(dir, create)
if err != nil {
return oci.RuntimeConfig{}, err
}
return oci.RuntimeConfig{
HypervisorType: vc.QemuHypervisor,
HypervisorConfig: hypervisorConfig,
Console: consolePath,
}, nil
}
// realMakeOCIBundle will create an OCI bundle (including the "config.json"
// config file) in the directory specified (which must already exist).
//
// XXX: Note that tests should *NOT* call this function - they should
// XXX: instead call makeOCIBundle().
func realMakeOCIBundle(bundleDir string) error {
if bundleDir == "" {
return errors.New("BUG: Need bundle directory")
}
if !katautils.FileExists(bundleDir) {
return fmt.Errorf("BUG: Bundle directory %v does not exist", bundleDir)
}
err := createOCIConfig(bundleDir)
if err != nil {
return err
}
// Note the unusual parameter (a directory, not the config
// file to parse!)
spec, err := compatoci.ParseConfigJSON(bundleDir)
if err != nil {
return err
}
// Determine the rootfs directory name the OCI config refers to
ociRootPath := spec.Root.Path
rootfsDir := filepath.Join(bundleDir, ociRootPath)
if strings.HasPrefix(ociRootPath, "/") {
return fmt.Errorf("Cannot handle absolute rootfs as bundle must be unique to each test")
}
err = createRootfs(rootfsDir)
if err != nil {
return err
}
return nil
}
// Create an OCI bundle in the specified directory.
//
// Note that the directory will be created, but it's parent is expected to exist.
//
// This function works by copying the already-created test bundle. Ideally,
// the bundle would be recreated for each test, but createRootfs() uses
// docker which on some systems is too slow, resulting in the tests timing
// out.
func makeOCIBundle(bundleDir string) error {
from := testBundleDir
to := bundleDir
// only the basename of bundleDir needs to exist as bundleDir
// will get created by cp(1).
base := filepath.Dir(bundleDir)
for _, dir := range []string{from, base} {
if !katautils.FileExists(dir) {
return fmt.Errorf("BUG: directory %v should exist", dir)
}
}
output, err := utils.RunCommandFull([]string{"cp", "-a", from, to}, true)
if err != nil {
return fmt.Errorf("failed to copy test OCI bundle from %v to %v: %v (output: %v)", from, to, err, output)
}
return nil
}
// createRootfs creates a minimal root filesystem below the specified
// directory.
func createRootfs(dir string) error {
err := os.MkdirAll(dir, testDirMode)
if err != nil {
return err
}
container, err := ctrEngine.Create(testDockerImage)
if err != nil {
return err
}
err = ctrEngine.GetRootfs(container, dir)
if err != nil {
return err
}
// Clean up
_, err = ctrEngine.Rm(container)
if err != nil {
return err
}
return nil
}
func writeOCIConfigFile(spec specs.Spec, configPath string) error {
if configPath == "" {
return errors.New("BUG: need config file path")
}
bytes, err := json.MarshalIndent(spec, "", "\t")
if err != nil {
return err
}
return ioutil.WriteFile(configPath, bytes, testFileMode)
}

View File

@@ -0,0 +1,180 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"os"
"path"
"time"
"github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/mount"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
)
const defaultCheckInterval = 1 * time.Second
func wait(ctx context.Context, s *service, c *container, execID string) (int32, error) {
var execs *exec
var err error
processID := c.id
if execID == "" {
//wait until the io closed, then wait the container
<-c.exitIOch
} else {
execs, err = c.getExec(execID)
if err != nil {
return exitCode255, err
}
<-execs.exitIOch
//This wait could be triggered before exec start which
//will get the exec's id, thus this assignment must after
//the exec exit, to make sure it get the exec's id.
processID = execs.id
}
ret, err := s.sandbox.WaitProcess(ctx, c.id, processID)
if err != nil {
shimLog.WithError(err).WithFields(logrus.Fields{
"container": c.id,
"pid": processID,
}).Error("Wait for process failed")
}
timeStamp := time.Now()
s.mu.Lock()
if execID == "" {
// Take care of the use case where it is a sandbox.
// Right after the container representing the sandbox has
// been deleted, let's make sure we stop and delete the
// sandbox.
if c.cType.IsSandbox() {
// cancel watcher
if s.monitor != nil {
s.monitor <- nil
}
if err = s.sandbox.Stop(ctx, true); err != nil {
shimLog.WithField("sandbox", s.sandbox.ID()).Error("failed to stop sandbox")
}
if err = s.sandbox.Delete(ctx); err != nil {
shimLog.WithField("sandbox", s.sandbox.ID()).Error("failed to delete sandbox")
}
} else {
if _, err = s.sandbox.StopContainer(ctx, c.id, false); err != nil {
shimLog.WithError(err).WithField("container", c.id).Warn("stop container failed")
}
}
c.status = task.StatusStopped
c.exit = uint32(ret)
c.exitTime = timeStamp
c.exitCh <- uint32(ret)
} else {
execs.status = task.StatusStopped
execs.exitCode = ret
execs.exitTime = timeStamp
execs.exitCh <- uint32(ret)
}
s.mu.Unlock()
go cReap(s, int(ret), c.id, execID, timeStamp)
return ret, nil
}
func watchSandbox(ctx context.Context, s *service) {
if s.monitor == nil {
return
}
err := <-s.monitor
if err == nil {
return
}
s.monitor = nil
s.mu.Lock()
defer s.mu.Unlock()
// sandbox malfunctioning, cleanup as much as we can
shimLog.WithError(err).Warn("sandbox stopped unexpectedly")
err = s.sandbox.Stop(ctx, true)
if err != nil {
shimLog.WithError(err).Warn("stop sandbox failed")
}
err = s.sandbox.Delete(ctx)
if err != nil {
shimLog.WithError(err).Warn("delete sandbox failed")
}
for _, c := range s.containers {
if !c.mounted {
continue
}
rootfs := path.Join(c.bundle, "rootfs")
shimLog.WithField("rootfs", rootfs).WithField("container", c.id).Debug("container umount rootfs")
if err := mount.UnmountAll(rootfs, 0); err != nil {
shimLog.WithError(err).Warn("failed to cleanup rootfs mount")
}
}
// Existing container/exec will be cleaned up by its waiters.
// No need to send async events here.
}
func watchOOMEvents(ctx context.Context, s *service) {
if s.sandbox == nil {
return
}
for {
select {
case <-s.ctx.Done():
return
default:
containerID, err := s.sandbox.GetOOMEvent(ctx)
if err != nil {
shimLog.WithError(err).Warn("failed to get OOM event from sandbox")
// If the GetOOMEvent call is not implemented, then the agent is most likely an older version,
// stop attempting to get OOM events.
// for rust agent, the response code is not found
if isGRPCErrorCode(codes.NotFound, err) || err.Error() == "Dead agent" {
return
}
time.Sleep(defaultCheckInterval)
continue
}
// write oom file for CRI-O
if c, ok := s.containers[containerID]; ok && oci.IsCRIOContainerManager(c.spec) {
oomPath := path.Join(c.bundle, "oom")
shimLog.Infof("write oom file to notify CRI-O: %s", oomPath)
f, err := os.OpenFile(oomPath, os.O_CREATE, 0666)
if err != nil {
shimLog.WithError(err).Warnf("failed to write oom file %s", oomPath)
} else {
f.Close()
}
}
// publish event for containerd
s.send(&events.TaskOOM{
ContainerID: containerID,
})
}
}
}

View File

@@ -7,12 +7,13 @@ package katamonitor
import (
"fmt"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
"io"
"net"
"net/http"
shim "github.com/kata-containers/kata-containers/src/runtime/containerd-shim-v2"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
)
func serveError(w http.ResponseWriter, status int, txt string) {

View File

@@ -13,7 +13,8 @@ import (
"time"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
shim "github.com/kata-containers/kata-containers/src/runtime/containerd-shim-v2"
shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
)
const (