mirror of
https://github.com/aljazceru/kata-containers.git
synced 2026-01-24 08:44:21 +01:00
Merge pull request #3406 from fengwang666/direct-blk-assignment
Implement direct-assigned volume
This commit is contained in:
@@ -7,26 +7,33 @@ package containerdshim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
|
||||
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
|
||||
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
)
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
|
||||
const (
|
||||
DirectVolumeStatUrl = "/direct-volume/stats"
|
||||
DirectVolumeResizeUrl = "/direct-volume/resize"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -34,6 +41,11 @@ var (
|
||||
shimMgtLog = shimLog.WithField("subsystem", "shim-management")
|
||||
)
|
||||
|
||||
type ResizeRequest struct {
|
||||
VolumePath string
|
||||
Size uint64
|
||||
}
|
||||
|
||||
// agentURL returns URL for agent
|
||||
func (s *service) agentURL(w http.ResponseWriter, r *http.Request) {
|
||||
url, err := s.sandbox.GetAgentURL()
|
||||
@@ -126,6 +138,52 @@ func decodeAgentMetrics(body string) []*dto.MetricFamily {
|
||||
return list
|
||||
}
|
||||
|
||||
func (s *service) serveVolumeStats(w http.ResponseWriter, r *http.Request) {
|
||||
volumePath, err := url.PathUnescape(strings.TrimPrefix(r.URL.Path, DirectVolumeStatUrl))
|
||||
if err != nil {
|
||||
shimMgtLog.WithError(err).Error("failed to unescape the volume stat url path")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := s.sandbox.GuestVolumeStats(context.Background(), volumePath)
|
||||
if err != nil {
|
||||
shimMgtLog.WithError(err).WithField("volume-path", volumePath).Error("failed to get volume stats")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
w.Write(buf)
|
||||
}
|
||||
|
||||
func (s *service) serveVolumeResize(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
shimMgtLog.WithError(err).Error("failed to read request body")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
var resizeReq ResizeRequest
|
||||
err = json.Unmarshal(body, &resizeReq)
|
||||
if err != nil {
|
||||
shimMgtLog.WithError(err).Error("failed to unmarshal the http request body")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
err = s.sandbox.ResizeGuestVolume(context.Background(), resizeReq.VolumePath, resizeReq.Size)
|
||||
if err != nil {
|
||||
shimMgtLog.WithError(err).WithField("volume-path", resizeReq.VolumePath).Error("failed to resize the volume")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
w.Write([]byte(""))
|
||||
}
|
||||
|
||||
func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
|
||||
// metrics socket will under sandbox's bundle path
|
||||
metricsAddress := SocketAddress(s.id)
|
||||
@@ -148,6 +206,8 @@ func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec
|
||||
m := http.NewServeMux()
|
||||
m.Handle("/metrics", http.HandlerFunc(s.serveMetrics))
|
||||
m.Handle("/agent-url", http.HandlerFunc(s.agentURL))
|
||||
m.Handle(DirectVolumeStatUrl, http.HandlerFunc(s.serveVolumeStats))
|
||||
m.Handle(DirectVolumeResizeUrl, http.HandlerFunc(s.serveVolumeResize))
|
||||
s.mountPprofHandle(m, ociSpec)
|
||||
|
||||
// register shim metrics
|
||||
|
||||
108
src/runtime/pkg/direct-volume/utils.go
Normal file
108
src/runtime/pkg/direct-volume/utils.go
Normal file
@@ -0,0 +1,108 @@
|
||||
// Copyright (c) 2022 Databricks Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package volume
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
mountInfoFileName = "mountInfo.json"
|
||||
)
|
||||
|
||||
var kataDirectVolumeRootPath = "/run/kata-containers/shared/direct-volumes"
|
||||
|
||||
// MountInfo contains the information needed by Kata to consume a host block device and mount it as a filesystem inside the guest VM.
|
||||
type MountInfo struct {
|
||||
// The type of the volume (ie. block)
|
||||
VolumeType string `json:"volume-type"`
|
||||
// The device backing the volume.
|
||||
Device string `json:"device"`
|
||||
// The filesystem type to be mounted on the volume.
|
||||
FsType string `json:"fstype"`
|
||||
// Additional metadata to pass to the agent regarding this volume.
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
// Additional mount options.
|
||||
Options []string `json:"options,omitempty"`
|
||||
}
|
||||
|
||||
// Add writes the mount info of a direct volume into a filesystem path known to Kata Container.
|
||||
func Add(volumePath string, mountInfo string) error {
|
||||
volumeDir := filepath.Join(kataDirectVolumeRootPath, volumePath)
|
||||
stat, err := os.Stat(volumeDir)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
if stat != nil && !stat.IsDir() {
|
||||
return fmt.Errorf("%s should be a directory", volumeDir)
|
||||
}
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
if err := os.MkdirAll(volumeDir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var deserialized MountInfo
|
||||
if err := json.Unmarshal([]byte(mountInfo), &deserialized); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(filepath.Join(volumeDir, mountInfoFileName), []byte(mountInfo), 0600)
|
||||
}
|
||||
|
||||
// Remove deletes the direct volume path including all the files inside it.
|
||||
func Remove(volumePath string) error {
|
||||
// Find the base of the volume path to delete the whole volume path
|
||||
base := strings.SplitN(volumePath, string(os.PathSeparator), 2)[0]
|
||||
return os.RemoveAll(filepath.Join(kataDirectVolumeRootPath, base))
|
||||
}
|
||||
|
||||
// VolumeMountInfo retrieves the mount info of a direct volume.
|
||||
func VolumeMountInfo(volumePath string) (*MountInfo, error) {
|
||||
mountInfoFilePath := filepath.Join(kataDirectVolumeRootPath, volumePath, mountInfoFileName)
|
||||
if _, err := os.Stat(mountInfoFilePath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf, err := ioutil.ReadFile(mountInfoFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var mountInfo MountInfo
|
||||
if err := json.Unmarshal(buf, &mountInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &mountInfo, nil
|
||||
}
|
||||
|
||||
// RecordSandboxId associates a sandbox id with a direct volume.
|
||||
func RecordSandboxId(sandboxId string, volumePath string) error {
|
||||
mountInfoFilePath := filepath.Join(kataDirectVolumeRootPath, volumePath, mountInfoFileName)
|
||||
if _, err := os.Stat(mountInfoFilePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(filepath.Join(kataDirectVolumeRootPath, volumePath, sandboxId), []byte(""), 0600)
|
||||
}
|
||||
|
||||
func GetSandboxIdForVolume(volumePath string) (string, error) {
|
||||
files, err := ioutil.ReadDir(filepath.Join(kataDirectVolumeRootPath, volumePath))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// Find the id of the first sandbox.
|
||||
// We expect a direct-assigned volume is associated with only a sandbox at a time.
|
||||
for _, file := range files {
|
||||
if file.Name() != mountInfoFileName {
|
||||
return file.Name(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("no sandbox found for %s", volumePath)
|
||||
}
|
||||
94
src/runtime/pkg/direct-volume/utils_test.go
Normal file
94
src/runtime/pkg/direct-volume/utils_test.go
Normal file
@@ -0,0 +1,94 @@
|
||||
// Copyright (c) 2022 Databricks Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package volume
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAdd(t *testing.T) {
|
||||
var err error
|
||||
kataDirectVolumeRootPath, err = os.MkdirTemp(os.TempDir(), "add-test")
|
||||
assert.Nil(t, err)
|
||||
defer os.RemoveAll(kataDirectVolumeRootPath)
|
||||
var volumePath = "/a/b/c"
|
||||
var basePath = "a"
|
||||
actual := MountInfo{
|
||||
VolumeType: "block",
|
||||
Device: "/dev/sda",
|
||||
FsType: "ext4",
|
||||
Options: []string{"journal_dev", "noload"},
|
||||
}
|
||||
buf, err := json.Marshal(actual)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Add the mount info
|
||||
assert.Nil(t, Add(volumePath, string(buf)))
|
||||
|
||||
// Validate the mount info
|
||||
expected, err := VolumeMountInfo(volumePath)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expected.Device, actual.Device)
|
||||
assert.Equal(t, expected.FsType, actual.FsType)
|
||||
assert.Equal(t, expected.Options, actual.Options)
|
||||
|
||||
// Remove the file
|
||||
err = Remove(volumePath)
|
||||
assert.Nil(t, err)
|
||||
_, err = os.Stat(filepath.Join(kataDirectVolumeRootPath, basePath))
|
||||
assert.True(t, errors.Is(err, os.ErrNotExist))
|
||||
|
||||
// Test invalid mount info json
|
||||
assert.Error(t, Add(volumePath, "{invalid json}"))
|
||||
}
|
||||
|
||||
func TestRecordSandboxId(t *testing.T) {
|
||||
var err error
|
||||
kataDirectVolumeRootPath, err = os.MkdirTemp(os.TempDir(), "recordSanboxId-test")
|
||||
assert.Nil(t, err)
|
||||
defer os.RemoveAll(kataDirectVolumeRootPath)
|
||||
|
||||
var volumePath = "/a/b/c"
|
||||
mntInfo := MountInfo{
|
||||
VolumeType: "block",
|
||||
Device: "/dev/sda",
|
||||
FsType: "ext4",
|
||||
Options: []string{"journal_dev", "noload"},
|
||||
}
|
||||
buf, err := json.Marshal(mntInfo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// Add the mount info
|
||||
assert.Nil(t, Add(volumePath, string(buf)))
|
||||
|
||||
sandboxId := uuid.Generate().String()
|
||||
err = RecordSandboxId(sandboxId, volumePath)
|
||||
assert.Nil(t, err)
|
||||
|
||||
id, err := GetSandboxIdForVolume(volumePath)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, sandboxId, id)
|
||||
}
|
||||
|
||||
func TestRecordSandboxIdNoMountInfoFile(t *testing.T) {
|
||||
var err error
|
||||
kataDirectVolumeRootPath, err = os.MkdirTemp(os.TempDir(), "recordSanboxId-test")
|
||||
assert.Nil(t, err)
|
||||
defer os.RemoveAll(kataDirectVolumeRootPath)
|
||||
|
||||
var volumePath = "/a/b/c"
|
||||
sandboxId := uuid.Generate().String()
|
||||
err = RecordSandboxId(sandboxId, volumePath)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, errors.Is(err, os.ErrNotExist))
|
||||
}
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/expfmt"
|
||||
|
||||
@@ -224,7 +224,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
|
||||
}
|
||||
|
||||
func getParsedMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata) ([]*dto.MetricFamily, error) {
|
||||
body, err := doGet(sandboxID, defaultTimeout, "metrics")
|
||||
body, err := shimclient.DoGet(sandboxID, defaultTimeout, "metrics")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -234,7 +234,7 @@ func getParsedMetrics(sandboxID string, sandboxMetadata sandboxCRIMetadata) ([]*
|
||||
|
||||
// GetSandboxMetrics will get sandbox's metrics from shim
|
||||
func GetSandboxMetrics(sandboxID string) (string, error) {
|
||||
body, err := doGet(sandboxID, defaultTimeout, "metrics")
|
||||
body, err := shimclient.DoGet(sandboxID, defaultTimeout, "metrics")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/pkg/utils/shimclient"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -180,7 +182,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
data, err := doGet(sandboxID, defaultTimeout, "agent-url")
|
||||
data, err := shimclient.DoGet(sandboxID, defaultTimeout, "agent-url")
|
||||
if err != nil {
|
||||
commonServeError(w, http.StatusBadRequest, err)
|
||||
return
|
||||
|
||||
@@ -7,13 +7,9 @@ package katamonitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||
|
||||
shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
|
||||
)
|
||||
|
||||
@@ -40,51 +36,3 @@ func getSandboxIDFromReq(r *http.Request) (string, error) {
|
||||
func getSandboxFS() string {
|
||||
return shim.GetSandboxesStoragePath()
|
||||
}
|
||||
|
||||
// BuildShimClient builds and returns an http client for communicating with the provided sandbox
|
||||
func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
|
||||
return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
|
||||
}
|
||||
|
||||
// buildUnixSocketClient build http client for Unix socket
|
||||
func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
|
||||
transport := &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
Dial: func(proto, addr string) (conn net.Conn, err error) {
|
||||
return cdshim.AnonDialer(socketAddr, timeout)
|
||||
},
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
if timeout > 0 {
|
||||
client.Timeout = timeout
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func doGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
|
||||
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Get(fmt.Sprintf("http://shim/%s", urlPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return body, nil
|
||||
}
|
||||
|
||||
79
src/runtime/pkg/utils/shimclient/shim_management_client.go
Normal file
79
src/runtime/pkg/utils/shimclient/shim_management_client.go
Normal file
@@ -0,0 +1,79 @@
|
||||
// Copyright (c) 2022 Databricks Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package shimclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
cdshim "github.com/containerd/containerd/runtime/v2/shim"
|
||||
shim "github.com/kata-containers/kata-containers/src/runtime/pkg/containerd-shim-v2"
|
||||
)
|
||||
|
||||
// BuildShimClient builds and returns an http client for communicating with the provided sandbox
|
||||
func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
|
||||
return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
|
||||
}
|
||||
|
||||
// buildUnixSocketClient build http client for Unix socket
|
||||
func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
|
||||
transport := &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
Dial: func(proto, addr string) (conn net.Conn, err error) {
|
||||
return cdshim.AnonDialer(socketAddr, timeout)
|
||||
},
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
if timeout > 0 {
|
||||
client.Timeout = timeout
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func DoGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
|
||||
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Get(fmt.Sprintf("http://shim/%s", urlPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func DoPost(sandboxID string, timeoutInSeconds time.Duration, urlPath string, payload []byte) error {
|
||||
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := client.Post(fmt.Sprintf("http://shim/%s", urlPath), "application/json", bytes.NewBuffer(payload))
|
||||
defer func() {
|
||||
resp.Body.Close()
|
||||
}()
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user