Files
kata-containers/src/runtime/virtcontainers/nydusd.go
luodaowen.backend 2d9f89aec7 feature(nydusd): add nydusd support to introduse lazyload ability
Pulling image is the most time-consuming step in the container lifecycle. This PR
introduse nydus to kata container, it can lazily pull image when container start. So it
can speed up kata container create and start.

Fixes #2724

Signed-off-by: luodaowen.backend <luodaowen.backend@bytedance.com>
2022-02-11 21:41:17 +08:00

480 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (c) 2017 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils/katatrace"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils/retry"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
infoEndpoint = "http://unix/api/v1/daemon"
mountEndpoint = "http://unix/api/v1/mount"
nydusdStopTimeoutSecs = 5
defaultHttpClientTimeoutSecs = 30 * time.Second
contentType = "application/json"
maxIdleConns = 10
idleConnTimeoutSecs = 10 * time.Second
dialTimoutSecs = 5 * time.Second
keepAliveSecs = 5 * time.Second
expectContinueTimeoutSecs = 1 * time.Second
// Registry Acceleration File System which is nydus provide to accelerate image load
nydusRafs = "rafs"
// used to shared directories between host and guest
nydusPassthroughfs = "passthrough_fs"
sharedPathInGuest = "/containers"
shimNsPath = "/proc/self/ns/net"
)
var (
nydusdTracingTags = map[string]string{
"source": "runtime",
"package": "virtcontainers",
"subsystem": "nydusd",
}
errNydusdDaemonPathInvalid = errors.New("nydusd daemon path is invalid")
errNydusdSockPathInvalid = errors.New("nydusd sock path is invalid")
errNydusdAPISockPathInvalid = errors.New("nydusd api sock path is invalid")
errNydusdSourcePathInvalid = errors.New("nydusd resource path is invalid")
errNydusdNotSupport = errors.New("nydusd only supports the QEMU hypervisor currently (see https://github.com/kata-containers/kata-containers/issues/2724)")
)
type nydusd struct {
startFn func(cmd *exec.Cmd) error // for mock testing
setupShareDirFn func() error // for mock testing
path string
sockPath string
apiSockPath string
sourcePath string
extraArgs []string
pid int
debug bool
}
func startInShimNS(cmd *exec.Cmd) error {
// Create nydusd in shim netns as it needs to access host network
return doNetNS(shimNsPath, func(_ ns.NetNS) error {
return cmd.Start()
})
}
func (nd *nydusd) Start(ctx context.Context, onQuit onQuitFunc) (int, error) {
span, _ := katatrace.Trace(ctx, nd.Logger(), "Start", nydusdTracingTags)
defer span.End()
pid := 0
if err := nd.valid(); err != nil {
return pid, err
}
args, err := nd.args()
if err != nil {
return pid, err
}
cmd := exec.Command(nd.path, args...)
r, w, err := os.Pipe()
if err != nil {
return pid, err
}
cmd.Stdout = w
cmd.Stderr = w
fields := logrus.Fields{
"path": nd.path,
"args": strings.Join(args, " "),
}
nd.Logger().WithFields(fields).Info()
if err := nd.startFn(cmd); err != nil {
return pid, err
}
// Monitor nydusd's stdout/stderr and stop sandbox if nydusd quits
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
nd.Logger().Info(scanner.Text())
}
nd.Logger().Info("nydusd quits")
// Wait to release resources of nydusd process
_, err = cmd.Process.Wait()
if err != nil {
nd.Logger().WithError(err).Warn("nydusd quits")
}
if onQuit != nil {
onQuit()
}
}()
if err := nd.setupShareDirFn(); err != nil {
return pid, err
}
nd.pid = cmd.Process.Pid
return nd.pid, nil
}
func (nd *nydusd) args() ([]string, error) {
logLevel := "info"
if nd.debug {
logLevel = "debug"
}
args := []string{
"--log-level", logLevel,
"--apisock", nd.apiSockPath,
"--sock", nd.sockPath,
}
if len(nd.extraArgs) > 0 {
args = append(args, nd.extraArgs...)
}
return args, nil
}
func checkPathValid(path string) error {
if len(path) == 0 {
return errors.New("path is empty")
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
dir := filepath.Dir(absPath)
if _, err := os.Stat(dir); err != nil {
return err
}
return nil
}
func (nd *nydusd) valid() error {
if err := checkPathValid(nd.sockPath); err != nil {
nd.Logger().WithError(err).Info("check nydusd sock path err")
return errNydusdSockPathInvalid
}
if err := checkPathValid(nd.apiSockPath); err != nil {
nd.Logger().WithError(err).Info("check nydusd api sock path err")
return errNydusdAPISockPathInvalid
}
if err := checkPathValid(nd.path); err != nil {
nd.Logger().WithError(err).Info("check nydusd daemon path err")
return errNydusdDaemonPathInvalid
}
if err := checkPathValid(nd.sourcePath); err != nil {
nd.Logger().WithError(err).Info("check nydusd daemon path err")
return errNydusdSourcePathInvalid
}
return nil
}
func (nd *nydusd) setupPassthroughFS() error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("from", nd.sourcePath).
WithField("dest", sharedPathInGuest).Info("prepare mount passthroughfs")
mr := NewMountRequest(nydusPassthroughfs, nd.sourcePath, "")
return nc.Mount(sharedPathInGuest, mr)
}
func (nd *nydusd) Mount(opt MountOption) error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("from", opt.source).
WithField("dest", opt.mountpoint).Info("prepare mount rafs")
mr := NewMountRequest(nydusRafs, opt.source, opt.config)
return nc.Mount(opt.mountpoint, mr)
}
func (nd *nydusd) Umount(mountpoint string) error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("mountpoint", mountpoint).Info("umount rafs")
return nc.Umount(mountpoint)
}
func (nd *nydusd) Stop(ctx context.Context) error {
if err := nd.kill(ctx); err != nil {
nd.Logger().WithError(err).WithField("pid", nd.pid).Warn("kill nydusd failed")
return nil
}
err := os.Remove(nd.sockPath)
if err != nil {
nd.Logger().WithError(err).WithField("path", nd.sockPath).Warn("removing nydusd socket failed")
}
err = os.Remove(nd.apiSockPath)
if err != nil {
nd.Logger().WithError(err).WithField("path", nd.apiSockPath).Warn("removing nydusd api socket failed")
}
return nil
}
func (nd *nydusd) Logger() *logrus.Entry {
return hvLogger.WithField("subsystem", "nydusd")
}
func (nd *nydusd) kill(ctx context.Context) (err error) {
span, _ := katatrace.Trace(ctx, nd.Logger(), "kill", nydusdTracingTags)
defer span.End()
if nd.pid <= 0 {
nd.Logger().WithField("invalid-nydusd-pid", nd.pid).Warn("cannot kill nydusd")
return nil
}
if err := utils.WaitLocalProcess(nd.pid, nydusdStopTimeoutSecs, syscall.SIGTERM, nd.Logger()); err != nil {
nd.Logger().WithError(err).Warn("kill nydusd err")
}
nd.pid = 0
return err
}
type BuildTimeInfo struct {
PackageVer string `json:"package_ver"`
GitCommit string `json:"git_commit"`
BuildTime string `json:"build_time"`
Profile string `json:"profile"`
Rustc string `json:"rustc"`
}
type DaemonInfo struct {
ID string `json:"id"`
Version BuildTimeInfo `json:"version"`
State string `json:"state"`
}
type ErrorMessage struct {
Code string `json:"code"`
Message string `json:"message"`
}
type MountRequest struct {
FsType string `json:"fs_type"`
Source string `json:"source"`
Config string `json:"config"`
}
func NewMountRequest(fsType, source, config string) *MountRequest {
return &MountRequest{
FsType: fsType,
Source: source,
Config: config,
}
}
type Interface interface {
CheckStatus() (DaemonInfo, error)
Mount(string, *MountRequest) error
Umount(sharedMountPoint string) error
}
type NydusClient struct {
httpClient *http.Client
}
func NewNydusClient(sock string) (Interface, error) {
transport, err := buildTransport(sock)
if err != nil {
return nil, err
}
return &NydusClient{
httpClient: &http.Client{
Timeout: defaultHttpClientTimeoutSecs,
Transport: transport,
},
}, nil
}
func waitUntilSocketReady(sock string) error {
return retry.Do(func() error {
if _, err := os.Stat(sock); err != nil {
return err
}
return nil
},
retry.Attempts(3),
retry.LastErrorOnly(true),
retry.Delay(100*time.Millisecond))
}
func buildTransport(sock string) (http.RoundTripper, error) {
err := waitUntilSocketReady(sock)
if err != nil {
return nil, err
}
return &http.Transport{
MaxIdleConns: maxIdleConns,
IdleConnTimeout: idleConnTimeoutSecs,
ExpectContinueTimeout: expectContinueTimeoutSecs,
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimoutSecs,
KeepAlive: keepAliveSecs,
}
return dialer.DialContext(ctx, "unix", sock)
},
}, nil
}
func (c *NydusClient) CheckStatus() (DaemonInfo, error) {
resp, err := c.httpClient.Get(infoEndpoint)
if err != nil {
return DaemonInfo{}, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return DaemonInfo{}, err
}
var info DaemonInfo
if err = json.Unmarshal(b, &info); err != nil {
return DaemonInfo{}, err
}
return info, nil
}
func checkRafsMountPointValid(mp string) bool {
// refer to https://github.com/opencontainers/runc/blob/master/libcontainer/factory_linux.go#L30
re := regexp.MustCompile(`/rafs/[\w+-\.]+/lowerdir`)
return re.MatchString(mp)
}
func (c *NydusClient) checkMountPoint(mountPoint string, fsType string) error {
switch fsType {
case nydusPassthroughfs:
// sharedir has been checked in args check.
return nil
case nydusRafs:
// nydusRafs mountpoint path format: /rafs/<container_id>/lowerdir
if checkRafsMountPointValid(mountPoint) {
return nil
}
return fmt.Errorf("rafs mountpoint %s is invalid", mountPoint)
default:
return errors.New("unsupported filesystem type")
}
}
func (c *NydusClient) Mount(mountPoint string, mr *MountRequest) error {
if err := c.checkMountPoint(mountPoint, mr.FsType); err != nil {
return errors.Wrap(err, "check mount point err")
}
requestURL := fmt.Sprintf("%s?mountpoint=%s", mountEndpoint, mountPoint)
body, err := json.Marshal(mr)
if err != nil {
return errors.Wrap(err, "failed to create mount request")
}
resp, err := c.httpClient.Post(requestURL, contentType, bytes.NewBuffer(body))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil
}
return handleMountError(resp.Body)
}
func (c *NydusClient) Umount(mountPoint string) error {
requestURL := fmt.Sprintf("%s?mountpoint=%s", mountEndpoint, mountPoint)
req, err := http.NewRequest(http.MethodDelete, requestURL, nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil
}
return handleMountError(resp.Body)
}
func handleMountError(r io.Reader) error {
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
var errMessage ErrorMessage
if err = json.Unmarshal(b, &errMessage); err != nil {
return err
}
return errors.New(errMessage.Message)
}
/*
rootfs mount format: Type: fuse.nydus-overlayfs, Source: overlay
Options[lowerdir=/foo/lower2:/foo/lower1,upperdir=/foo/upper,workdir=/foo/work,extraoption={source: xxx, config: xxx, snapshotdir: xxx}]
*/
type extraOption struct {
Source string `json:"source"`
Config string `json:"config"`
Snapshotdir string `json:"snapshotdir"`
}
const extraOptionKey = "extraoption="
func parseExtraOption(options []string) (*extraOption, error) {
extraOpt := ""
for _, opt := range options {
if strings.HasPrefix(opt, extraOptionKey) {
extraOpt = strings.TrimPrefix(opt, extraOptionKey)
}
}
if len(extraOpt) == 0 {
return nil, errors.New("no extraoption found")
}
opt, err := base64.StdEncoding.DecodeString(extraOpt)
if err != nil {
return nil, errors.Wrap(err, "base64 decoding err")
}
no := &extraOption{}
if err := json.Unmarshal(opt, no); err != nil {
return nil, errors.Wrapf(err, "json unmarshal err")
}
if len(no.Config) == 0 || len(no.Snapshotdir) == 0 || len(no.Source) == 0 {
return nil, fmt.Errorf("extra option is not correct, %+v", no)
}
return no, nil
}