mirror of
https://github.com/aljazceru/kata-containers.git
synced 2025-12-18 06:44:23 +01:00
Merge pull request #312 from Pennyzct/network_throttle_on_qemu
rate-limiter: network I/O throttling on VM level
This commit is contained in:
@@ -196,6 +196,17 @@ use_vsock = true
|
||||
# Warnings will be logged if any error is encountered will scanning for hooks,
|
||||
# but it will not abort container execution.
|
||||
#guest_hook_path = "/usr/share/oci/hooks"
|
||||
#
|
||||
# Use rx Rate Limiter to control network I/O inbound bandwidth(size in bits/sec for SB/VM).
|
||||
# In Firecracker, it provides a built-in rate limiter, which is based on TBF(Token Bucket Filter)
|
||||
# queueing discipline.
|
||||
# Default 0-sized value means unlimited rate.
|
||||
#rx_rate_limiter_max_rate = 0
|
||||
# Use tx Rate Limiter to control network I/O outbound bandwidth(size in bits/sec for SB/VM).
|
||||
# In Firecracker, it provides a built-in rate limiter, which is based on TBF(Token Bucket Filter)
|
||||
# queueing discipline.
|
||||
# Default 0-sized value means unlimited rate.
|
||||
#tx_rate_limiter_max_rate = 0
|
||||
|
||||
[factory]
|
||||
# VM templating support. Once enabled, new VMs are created from template
|
||||
|
||||
@@ -280,6 +280,16 @@ vhost_user_store_path = "@DEFVHOSTUSERSTOREPATH@"
|
||||
# Warnings will be logged if any error is encountered will scanning for hooks,
|
||||
# but it will not abort container execution.
|
||||
#guest_hook_path = "/usr/share/oci/hooks"
|
||||
#
|
||||
# Use rx Rate Limiter to control network I/O inbound bandwidth(size in bits/sec for SB/VM).
|
||||
# In Qemu, we use classful qdiscs HTB(Hierarchy Token Bucket) to discipline traffic.
|
||||
# Default 0-sized value means unlimited rate.
|
||||
#rx_rate_limiter_max_rate = 0
|
||||
# Use tx Rate Limiter to control network I/O outbound bandwidth(size in bits/sec for SB/VM).
|
||||
# In Qemu, we use classful qdiscs HTB(Hierarchy Token Bucket) and ifb(Intermediate Functional Block)
|
||||
# to discipline traffic.
|
||||
# Default 0-sized value means unlimited rate.
|
||||
#tx_rate_limiter_max_rate = 0
|
||||
|
||||
[factory]
|
||||
# VM templating support. Once enabled, new VMs are created from template
|
||||
|
||||
@@ -51,6 +51,8 @@ const defaultGuestHookPath string = ""
|
||||
const defaultVirtioFSCacheMode = "none"
|
||||
const defaultDisableImageNvdimm = false
|
||||
const defaultVhostUserStorePath string = "/var/run/kata-containers/vhost-user/"
|
||||
const defaultRxRateLimiterMaxRate = uint64(0)
|
||||
const defaultTxRateLimiterMaxRate = uint64(0)
|
||||
|
||||
const defaultTemplatePath string = "/run/vc/vm/template"
|
||||
const defaultVMCacheEndpoint string = "/var/run/kata-containers/cache.sock"
|
||||
|
||||
@@ -129,6 +129,8 @@ type hypervisor struct {
|
||||
HotplugVFIOOnRootBus bool `toml:"hotplug_vfio_on_root_bus"`
|
||||
DisableVhostNet bool `toml:"disable_vhost_net"`
|
||||
GuestHookPath string `toml:"guest_hook_path"`
|
||||
RxRateLimiterMaxRate uint64 `toml:"rx_rate_limiter_max_rate"`
|
||||
TxRateLimiterMaxRate uint64 `toml:"tx_rate_limiter_max_rate"`
|
||||
}
|
||||
|
||||
type proxy struct {
|
||||
@@ -430,6 +432,22 @@ func (h hypervisor) getInitrdAndImage() (initrd string, image string, err error)
|
||||
return
|
||||
}
|
||||
|
||||
func (h hypervisor) getRxRateLimiterCfg() (uint64, error) {
|
||||
if h.RxRateLimiterMaxRate < 0 {
|
||||
return 0, fmt.Errorf("rx Rate Limiter configuration must be greater than or equal to 0, max_rate %v", h.RxRateLimiterMaxRate)
|
||||
}
|
||||
|
||||
return h.RxRateLimiterMaxRate, nil
|
||||
}
|
||||
|
||||
func (h hypervisor) getTxRateLimiterCfg() (uint64, error) {
|
||||
if h.TxRateLimiterMaxRate < 0 {
|
||||
return 0, fmt.Errorf("tx Rate Limiter configuration must be greater than or equal to 0, max_rate %v", h.TxRateLimiterMaxRate)
|
||||
}
|
||||
|
||||
return h.TxRateLimiterMaxRate, nil
|
||||
}
|
||||
|
||||
func (p proxy) path() (string, error) {
|
||||
path := p.Path
|
||||
if path == "" {
|
||||
@@ -534,6 +552,16 @@ func newFirecrackerHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) {
|
||||
return vc.HypervisorConfig{}, errors.New("No vsock support, firecracker cannot be used")
|
||||
}
|
||||
|
||||
rxRateLimiterMaxRate, err := h.getRxRateLimiterCfg()
|
||||
if err != nil {
|
||||
return vc.HypervisorConfig{}, err
|
||||
}
|
||||
|
||||
txRateLimiterMaxRate, err := h.getTxRateLimiterCfg()
|
||||
if err != nil {
|
||||
return vc.HypervisorConfig{}, err
|
||||
}
|
||||
|
||||
return vc.HypervisorConfig{
|
||||
HypervisorPath: hypervisor,
|
||||
JailerPath: jailer,
|
||||
@@ -558,6 +586,8 @@ func newFirecrackerHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) {
|
||||
DisableVhostNet: true, // vhost-net backend is not supported in Firecracker
|
||||
UseVSock: true,
|
||||
GuestHookPath: h.guestHookPath(),
|
||||
RxRateLimiterMaxRate: rxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: txRateLimiterMaxRate,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -621,6 +651,16 @@ func newQemuHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) {
|
||||
}
|
||||
}
|
||||
|
||||
rxRateLimiterMaxRate, err := h.getRxRateLimiterCfg()
|
||||
if err != nil {
|
||||
return vc.HypervisorConfig{}, err
|
||||
}
|
||||
|
||||
txRateLimiterMaxRate, err := h.getTxRateLimiterCfg()
|
||||
if err != nil {
|
||||
return vc.HypervisorConfig{}, err
|
||||
}
|
||||
|
||||
return vc.HypervisorConfig{
|
||||
HypervisorPath: hypervisor,
|
||||
KernelPath: kernel,
|
||||
@@ -665,6 +705,8 @@ func newQemuHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) {
|
||||
EnableVhostUserStore: h.EnableVhostUserStore,
|
||||
VhostUserStorePath: h.vhostUserStorePath(),
|
||||
GuestHookPath: h.guestHookPath(),
|
||||
RxRateLimiterMaxRate: rxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: txRateLimiterMaxRate,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -1105,6 +1147,8 @@ func GetDefaultHypervisorConfig() vc.HypervisorConfig {
|
||||
VhostUserStorePath: defaultVhostUserStorePath,
|
||||
VirtioFSCache: defaultVirtioFSCacheMode,
|
||||
DisableImageNvdimm: defaultDisableImageNvdimm,
|
||||
RxRateLimiterMaxRate: defaultRxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: defaultTxRateLimiterMaxRate,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -788,6 +788,9 @@ func TestNewQemuHypervisorConfig(t *testing.T) {
|
||||
utils.VHostVSockDevicePath = orgVHostVSockDevicePath
|
||||
}()
|
||||
utils.VHostVSockDevicePath = "/dev/abc/xyz"
|
||||
// 10Mbits/sec
|
||||
rxRateLimiterMaxRate := uint64(10000000)
|
||||
txRateLimiterMaxRate := uint64(10000000)
|
||||
|
||||
hypervisor := hypervisor{
|
||||
Path: hypervisorPath,
|
||||
@@ -799,6 +802,8 @@ func TestNewQemuHypervisorConfig(t *testing.T) {
|
||||
HotplugVFIOOnRootBus: hotplugVFIOOnRootBus,
|
||||
PCIeRootPort: pcieRootPort,
|
||||
UseVSock: true,
|
||||
RxRateLimiterMaxRate: rxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: txRateLimiterMaxRate,
|
||||
}
|
||||
|
||||
files := []string{hypervisorPath, kernelPath, imagePath}
|
||||
@@ -859,6 +864,112 @@ func TestNewQemuHypervisorConfig(t *testing.T) {
|
||||
if config.PCIeRootPort != pcieRootPort {
|
||||
t.Errorf("Expected value for PCIeRootPort %v, got %v", pcieRootPort, config.PCIeRootPort)
|
||||
}
|
||||
|
||||
if config.RxRateLimiterMaxRate != rxRateLimiterMaxRate {
|
||||
t.Errorf("Expected value for rx rate limiter %v, got %v", rxRateLimiterMaxRate, config.RxRateLimiterMaxRate)
|
||||
}
|
||||
|
||||
if config.TxRateLimiterMaxRate != txRateLimiterMaxRate {
|
||||
t.Errorf("Expected value for tx rate limiter %v, got %v", txRateLimiterMaxRate, config.TxRateLimiterMaxRate)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewFirecrackerHypervisorConfig(t *testing.T) {
|
||||
dir, err := ioutil.TempDir(testDir, "hypervisor-config-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
hypervisorPath := path.Join(dir, "hypervisor")
|
||||
kernelPath := path.Join(dir, "kernel")
|
||||
imagePath := path.Join(dir, "image")
|
||||
jailerPath := path.Join(dir, "jailer")
|
||||
disableBlockDeviceUse := false
|
||||
disableVhostNet := true
|
||||
useVSock := true
|
||||
blockDeviceDriver := "virtio-mmio"
|
||||
// !0Mbits/sec
|
||||
rxRateLimiterMaxRate := uint64(10000000)
|
||||
txRateLimiterMaxRate := uint64(10000000)
|
||||
orgVHostVSockDevicePath := utils.VHostVSockDevicePath
|
||||
defer func() {
|
||||
utils.VHostVSockDevicePath = orgVHostVSockDevicePath
|
||||
}()
|
||||
utils.VHostVSockDevicePath = "/dev/null"
|
||||
|
||||
hypervisor := hypervisor{
|
||||
Path: hypervisorPath,
|
||||
Kernel: kernelPath,
|
||||
Image: imagePath,
|
||||
JailerPath: jailerPath,
|
||||
DisableBlockDeviceUse: disableBlockDeviceUse,
|
||||
BlockDeviceDriver: blockDeviceDriver,
|
||||
RxRateLimiterMaxRate: rxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: txRateLimiterMaxRate,
|
||||
}
|
||||
|
||||
files := []string{hypervisorPath, kernelPath, imagePath, jailerPath}
|
||||
filesLen := len(files)
|
||||
|
||||
for i, file := range files {
|
||||
_, err := newFirecrackerHypervisorConfig(hypervisor)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected newFirecrackerHypervisorConfig to fail as not all paths exist (not created %v)",
|
||||
strings.Join(files[i:filesLen], ","))
|
||||
}
|
||||
|
||||
// create the resource
|
||||
err = createEmptyFile(file)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
config, err := newFirecrackerHypervisorConfig(hypervisor)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if config.HypervisorPath != hypervisor.Path {
|
||||
t.Errorf("Expected hypervisor path %v, got %v", hypervisor.Path, config.HypervisorPath)
|
||||
}
|
||||
|
||||
if config.KernelPath != hypervisor.Kernel {
|
||||
t.Errorf("Expected kernel path %v, got %v", hypervisor.Kernel, config.KernelPath)
|
||||
}
|
||||
|
||||
if config.ImagePath != hypervisor.Image {
|
||||
t.Errorf("Expected image path %v, got %v", hypervisor.Image, config.ImagePath)
|
||||
}
|
||||
|
||||
if config.JailerPath != hypervisor.JailerPath {
|
||||
t.Errorf("Expected jailer path %v, got %v", hypervisor.JailerPath, config.JailerPath)
|
||||
}
|
||||
|
||||
if config.DisableBlockDeviceUse != disableBlockDeviceUse {
|
||||
t.Errorf("Expected value for disable block usage %v, got %v", disableBlockDeviceUse, config.DisableBlockDeviceUse)
|
||||
}
|
||||
|
||||
if config.BlockDeviceDriver != blockDeviceDriver {
|
||||
t.Errorf("Expected value for block device driver %v, got %v", blockDeviceDriver, config.BlockDeviceDriver)
|
||||
}
|
||||
|
||||
if config.DisableVhostNet != disableVhostNet {
|
||||
t.Errorf("Expected value for disable vhost net usage %v, got %v", disableVhostNet, config.DisableVhostNet)
|
||||
}
|
||||
|
||||
if config.UseVSock != useVSock {
|
||||
t.Errorf("Expected value for vsock usage %v, got %v", useVSock, config.UseVSock)
|
||||
}
|
||||
|
||||
if config.RxRateLimiterMaxRate != rxRateLimiterMaxRate {
|
||||
t.Errorf("Expected value for rx rate limiter %v, got %v", rxRateLimiterMaxRate, config.RxRateLimiterMaxRate)
|
||||
}
|
||||
|
||||
if config.TxRateLimiterMaxRate != txRateLimiterMaxRate {
|
||||
t.Errorf("Expected value for tx rate limiter %v, got %v", txRateLimiterMaxRate, config.TxRateLimiterMaxRate)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewQemuHypervisorConfigImageAndInitrd(t *testing.T) {
|
||||
|
||||
@@ -813,3 +813,7 @@ func (a *Acrn) loadInfo() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Acrn) isRateLimiterBuiltin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ type BridgedMacvlanEndpoint struct {
|
||||
EndpointProperties NetworkInfo
|
||||
EndpointType EndpointType
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
func createBridgedMacvlanNetworkEndpoint(idx int, ifName string, interworkingModel NetInterworkingModel) (*BridgedMacvlanEndpoint, error) {
|
||||
@@ -136,3 +138,21 @@ func (endpoint *BridgedMacvlanEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.NetPair = *netpair
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *BridgedMacvlanEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *BridgedMacvlanEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *BridgedMacvlanEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *BridgedMacvlanEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1210,3 +1210,7 @@ func (clh *cloudHypervisor) vmInfo() (chclient.VmInfo, error) {
|
||||
return info, openAPIClientError(err)
|
||||
|
||||
}
|
||||
|
||||
func (clh *cloudHypervisor) isRateLimiterBuiltin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -29,6 +29,11 @@ type Endpoint interface {
|
||||
|
||||
save() persistapi.NetworkEndpoint
|
||||
load(persistapi.NetworkEndpoint)
|
||||
|
||||
GetRxRateLimiter() bool
|
||||
SetRxRateLimiter() error
|
||||
GetTxRateLimiter() bool
|
||||
SetTxRateLimiter() error
|
||||
}
|
||||
|
||||
// EndpointType identifies the type of the network endpoint.
|
||||
|
||||
@@ -25,12 +25,12 @@ import (
|
||||
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/device/config"
|
||||
persistapi "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/persist/api"
|
||||
kataclient "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/firecracker/client"
|
||||
models "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/firecracker/client/models"
|
||||
ops "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/firecracker/client/operations"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types"
|
||||
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
|
||||
kataclient "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
|
||||
|
||||
"github.com/blang/semver"
|
||||
"github.com/containerd/console"
|
||||
@@ -908,11 +908,52 @@ func (fc *firecracker) fcAddNetDevice(endpoint Endpoint) {
|
||||
defer span.Finish()
|
||||
|
||||
ifaceID := endpoint.Name()
|
||||
|
||||
// The implementation of rate limiter is based on TBF.
|
||||
// Rate Limiter defines a token bucket with a maximum capacity (size) to store tokens, and an interval for refilling purposes (refill_time).
|
||||
// The refill-rate is derived from size and refill_time, and it is the constant rate at which the tokens replenish.
|
||||
refillTime := uint64(1000)
|
||||
var rxRateLimiter models.RateLimiter
|
||||
rxSize := fc.config.RxRateLimiterMaxRate
|
||||
if rxSize > 0 {
|
||||
fc.Logger().Info("Add rx rate limiter")
|
||||
|
||||
// kata-defined rxSize is in bits with scaling factors of 1000, but firecracker-defined
|
||||
// rxSize is in bytes with scaling factors of 1024, need reversion.
|
||||
rxSize = revertBytes(rxSize / 8)
|
||||
rxTokenBucket := models.TokenBucket{
|
||||
RefillTime: &refillTime,
|
||||
Size: &rxSize,
|
||||
}
|
||||
rxRateLimiter = models.RateLimiter{
|
||||
Bandwidth: &rxTokenBucket,
|
||||
}
|
||||
}
|
||||
|
||||
var txRateLimiter models.RateLimiter
|
||||
txSize := fc.config.TxRateLimiterMaxRate
|
||||
if txSize > 0 {
|
||||
fc.Logger().Info("Add tx rate limiter")
|
||||
|
||||
// kata-defined txSize is in bits with scaling factors of 1000, but firecracker-defined
|
||||
// txSize is in bytes with scaling factors of 1024, need reversion.
|
||||
txSize = revertBytes(txSize / 8)
|
||||
txTokenBucket := models.TokenBucket{
|
||||
RefillTime: &refillTime,
|
||||
Size: &txSize,
|
||||
}
|
||||
txRateLimiter = models.RateLimiter{
|
||||
Bandwidth: &txTokenBucket,
|
||||
}
|
||||
}
|
||||
|
||||
ifaceCfg := &models.NetworkInterface{
|
||||
AllowMmdsRequests: false,
|
||||
GuestMac: endpoint.HardwareAddr(),
|
||||
IfaceID: &ifaceID,
|
||||
HostDevName: &endpoint.NetworkPair().TapInterface.TAPIface.Name,
|
||||
RxRateLimiter: &rxRateLimiter,
|
||||
TxRateLimiter: &txRateLimiter,
|
||||
}
|
||||
|
||||
fc.fcConfig.NetworkInterfaces = append(fc.fcConfig.NetworkInterfaces, ifaceCfg)
|
||||
@@ -1212,3 +1253,20 @@ func (fc *firecracker) watchConsole() (*os.File, error) {
|
||||
|
||||
return stdio, nil
|
||||
}
|
||||
|
||||
func (fc *firecracker) isRateLimiterBuiltin() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// In firecracker, it accepts the size of rate limiter in scaling factors of 2^10(1024)
|
||||
// But in kata-defined rate limiter, for better Human-readability, we prefer scaling factors of 10^3(1000).
|
||||
// func revertByte reverts num from scaling factors of 1000 to 1024, e.g. 10000000(10MB) to 10485760.
|
||||
func revertBytes(num uint64) uint64 {
|
||||
a := num / 1000
|
||||
b := num % 1000
|
||||
if a == 0 {
|
||||
return num
|
||||
} else {
|
||||
return 1024*revertBytes(a) + b
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,3 +45,14 @@ func TestFCTruncateID(t *testing.T) {
|
||||
id = fc.truncateID(testShortID)
|
||||
assert.Equal(expectedID, id)
|
||||
}
|
||||
|
||||
func TestRevertBytes(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//10MB
|
||||
testNum := uint64(10000000)
|
||||
expectedNum := uint64(10485760)
|
||||
|
||||
num := revertBytes(testNum)
|
||||
assert.Equal(expectedNum, num)
|
||||
}
|
||||
|
||||
@@ -412,6 +412,12 @@ type HypervisorConfig struct {
|
||||
|
||||
// SELinux label for the VM
|
||||
SELinuxProcessLabel string
|
||||
|
||||
// RxRateLimiterMaxRate is used to control network I/O inbound bandwidth on VM level.
|
||||
RxRateLimiterMaxRate uint64
|
||||
|
||||
// TxRateLimiterMaxRate is used to control network I/O outbound bandwidth on VM level.
|
||||
TxRateLimiterMaxRate uint64
|
||||
}
|
||||
|
||||
// vcpu mapping from vcpu number to thread number
|
||||
@@ -796,4 +802,7 @@ type hypervisor interface {
|
||||
|
||||
// generate the socket to communicate the host and guest
|
||||
generateSocket(id string, useVsock bool) (interface{}, error)
|
||||
|
||||
// check if hypervisor supports built-in rate limiter.
|
||||
isRateLimiterBuiltin() bool
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ type IPVlanEndpoint struct {
|
||||
EndpointProperties NetworkInfo
|
||||
EndpointType EndpointType
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
func createIPVlanNetworkEndpoint(idx int, ifName string) (*IPVlanEndpoint, error) {
|
||||
@@ -139,3 +141,21 @@ func (endpoint *IPVlanEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.NetPair = *netpair
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *IPVlanEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *IPVlanEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *IPVlanEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *IPVlanEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,6 +19,8 @@ type MacvtapEndpoint struct {
|
||||
VMFds []*os.File
|
||||
VhostFds []*os.File
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
func createMacvtapNetworkEndpoint(netInfo NetworkInfo) (*MacvtapEndpoint, error) {
|
||||
@@ -121,3 +123,21 @@ func (endpoint *MacvtapEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.PCIAddr = s.Macvtap.PCIAddr
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *MacvtapEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *MacvtapEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *MacvtapEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *MacvtapEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -128,3 +128,7 @@ func (m *mockHypervisor) check() error {
|
||||
func (m *mockHypervisor) generateSocket(id string, useVsock bool) (interface{}, error) {
|
||||
return types.Socket{HostPath: "/tmp/socket", Name: "socket"}, nil
|
||||
}
|
||||
|
||||
func (m *mockHypervisor) isRateLimiterBuiltin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"sort"
|
||||
"time"
|
||||
@@ -1257,6 +1258,25 @@ func (n *Network) Add(ctx context.Context, config *NetworkConfig, hypervisor hyp
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !hypervisor.isRateLimiterBuiltin() {
|
||||
rxRateLimiterMaxRate := hypervisor.hypervisorConfig().RxRateLimiterMaxRate
|
||||
if rxRateLimiterMaxRate > 0 {
|
||||
networkLogger().Info("Add Rx Rate Limiter")
|
||||
if err := addRxRateLimiter(endpoint, rxRateLimiterMaxRate); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
txRateLimiterMaxRate := hypervisor.hypervisorConfig().TxRateLimiterMaxRate
|
||||
if txRateLimiterMaxRate > 0 {
|
||||
networkLogger().Info("Add Tx Rate Limiter")
|
||||
if err := addTxRateLimiter(endpoint, txRateLimiterMaxRate); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1303,6 +1323,22 @@ func (n *Network) Remove(ctx context.Context, ns *NetworkNamespace, hypervisor h
|
||||
defer span.Finish()
|
||||
|
||||
for _, endpoint := range ns.Endpoints {
|
||||
if endpoint.GetRxRateLimiter() {
|
||||
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting rx rate limiter")
|
||||
// Deleting rx rate limiter should enter the network namespace.
|
||||
if err := removeRxRateLimiter(endpoint, ns.NetNsPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if endpoint.GetTxRateLimiter() {
|
||||
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting tx rate limiter")
|
||||
// Deleting tx rate limiter should enter the network namespace.
|
||||
if err := removeTxRateLimiter(endpoint, ns.NetNsPath); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Detach for an endpoint should enter the network namespace
|
||||
// if required.
|
||||
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Detaching endpoint")
|
||||
@@ -1320,3 +1356,320 @@ func (n *Network) Remove(ctx context.Context, ns *NetworkNamespace, hypervisor h
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// func addRxRateLmiter implements tc-based rx rate limiter to control network I/O inbound traffic
|
||||
// on VM level for hypervisors which don't implement rate limiter in itself, like qemu, etc.
|
||||
func addRxRateLimiter(endpoint Endpoint, maxRate uint64) error {
|
||||
var linkName string
|
||||
switch ep := endpoint.(type) {
|
||||
case *VethEndpoint, *IPVlanEndpoint, *TuntapEndpoint, *BridgedMacvlanEndpoint:
|
||||
netPair := endpoint.NetworkPair()
|
||||
linkName = netPair.TapInterface.TAPIface.Name
|
||||
case *MacvtapEndpoint, *TapEndpoint:
|
||||
linkName = endpoint.Name()
|
||||
default:
|
||||
return fmt.Errorf("Unsupported endpointType %s for adding rx rate limiter", ep.Type())
|
||||
}
|
||||
|
||||
if err := endpoint.SetRxRateLimiter(); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
link, err := netlink.LinkByName(linkName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
linkIndex := link.Attrs().Index
|
||||
|
||||
return addHTBQdisc(linkIndex, maxRate)
|
||||
}
|
||||
|
||||
// func addHTBQdisc uses HTB(Hierarchical Token Bucket) qdisc shaping schemes to control interface traffic.
|
||||
// HTB (Hierarchical Token Bucket) shapes traffic based on the Token Bucket Filter algorithm.
|
||||
// A fundamental part of the HTB qdisc is the borrowing mechanism. Children classes borrow tokens
|
||||
// from their parents once they have exceeded rate. A child class will continue to attempt to borrow until
|
||||
// it reaches ceil. See more details in https://tldp.org/HOWTO/Traffic-Control-HOWTO/classful-qdiscs.html.
|
||||
//
|
||||
// * +-----+ +---------+ +-----------+ +-----------+
|
||||
// * | | | qdisc | | class 1:1 | | class 1:2 |
|
||||
// * | NIC | | htb | | rate | | rate |
|
||||
// * | | --> | def 1:2 | --> | ceil | -+-> | ceil |
|
||||
// * +-----+ +---------+ +-----------+ | +-----------+
|
||||
// * |
|
||||
// * | +-----------+
|
||||
// * | | class 1:n |
|
||||
// * | | rate |
|
||||
// * +-> | ceil |
|
||||
// * | +-----------+
|
||||
// Seeing from pic, after the routing decision, all packets will be sent to the interface root htb qdisc.
|
||||
// This root qdisc has only one direct child class (with id 1:1) which shapes the overall maximum rate
|
||||
// that will be sent through interface. Then, this class has at least one default child (1:2) meant to control all
|
||||
// non-privileged traffic.
|
||||
// e.g.
|
||||
// if we try to set VM bandwidth with maximum 10Mbit/s, we should give
|
||||
// classid 1:2 rate 10Mbit/s, ceil 10Mbit/s and classid 1:1 rate 10Mbit/s, ceil 10Mbit/s.
|
||||
// To-do:
|
||||
// Later, if we want to do limitation on some dedicated traffic(special process running in VM), we could create
|
||||
// a separate class (1:n) with guarantee throughput.
|
||||
func addHTBQdisc(linkIndex int, maxRate uint64) error {
|
||||
// we create a new htb root qdisc for network interface with the specified network index
|
||||
qdiscAttrs := netlink.QdiscAttrs{
|
||||
LinkIndex: linkIndex,
|
||||
Handle: netlink.MakeHandle(1, 0),
|
||||
Parent: netlink.HANDLE_ROOT,
|
||||
}
|
||||
qdisc := netlink.NewHtb(qdiscAttrs)
|
||||
// all non-priviledged traffic go to classid 1:2.
|
||||
qdisc.Defcls = 2
|
||||
|
||||
err := netlink.QdiscAdd(qdisc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to add htb qdisc: %v", err)
|
||||
}
|
||||
|
||||
// root htb qdisc has only one direct child class (with id 1:1) to control overall rate.
|
||||
classAttrs := netlink.ClassAttrs{
|
||||
LinkIndex: linkIndex,
|
||||
Parent: netlink.MakeHandle(1, 0),
|
||||
Handle: netlink.MakeHandle(1, 1),
|
||||
}
|
||||
htbClassAttrs := netlink.HtbClassAttrs{
|
||||
Rate: maxRate,
|
||||
Ceil: maxRate,
|
||||
}
|
||||
class := netlink.NewHtbClass(classAttrs, htbClassAttrs)
|
||||
if err := netlink.ClassAdd(class); err != nil {
|
||||
return fmt.Errorf("Failed to add htb classid 1:1 : %v", err)
|
||||
}
|
||||
|
||||
// above class has at least one default child class(1:2) for all non-priviledged traffic.
|
||||
classAttrs = netlink.ClassAttrs{
|
||||
LinkIndex: linkIndex,
|
||||
Parent: netlink.MakeHandle(1, 1),
|
||||
Handle: netlink.MakeHandle(1, 2),
|
||||
}
|
||||
htbClassAttrs = netlink.HtbClassAttrs{
|
||||
Rate: maxRate,
|
||||
Ceil: maxRate,
|
||||
}
|
||||
class = netlink.NewHtbClass(classAttrs, htbClassAttrs)
|
||||
if err := netlink.ClassAdd(class); err != nil {
|
||||
return fmt.Errorf("Failed to add htb class 1:2 : %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// The Intermediate Functional Block (ifb) pseudo network interface is an alternative
|
||||
// to tc filters for handling ingress traffic,
|
||||
// By redirecting interface ingress traffic to ifb and treat it as egress traffic there,
|
||||
// we could do network shaping to interface inbound traffic.
|
||||
func addIFBDevice() (int, error) {
|
||||
// check whether host supports ifb
|
||||
if ok, err := utils.SupportsIfb(); !ok {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
netHandle, err := netlink.NewHandle()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
defer netHandle.Delete()
|
||||
|
||||
// There exists error when using netlink library to create ifb interface
|
||||
cmd := exec.Command("ip", "link", "add", "dev", "ifb0", "type", "ifb")
|
||||
if output, err := cmd.CombinedOutput(); err != nil {
|
||||
return -1, fmt.Errorf("Could not create link ifb0: %v, error %v", output, err)
|
||||
}
|
||||
|
||||
ifbLink, err := netlink.LinkByName("ifb0")
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if err := netHandle.LinkSetUp(ifbLink); err != nil {
|
||||
return -1, fmt.Errorf("Could not enable link ifb0 %v", err)
|
||||
}
|
||||
|
||||
return ifbLink.Attrs().Index, nil
|
||||
}
|
||||
|
||||
// This is equivalent to calling:
|
||||
// tc filter add dev source parent ffff: protocol all u32 match u8 0 0 action mirred egress redirect dev ifb
|
||||
func addIFBRedirecting(sourceIndex int, ifbIndex int) error {
|
||||
if err := addQdiscIngress(sourceIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := addRedirectTCFilter(sourceIndex, ifbIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// func addTxRateLmiter implements tx rate limiter to control network I/O outbound traffic
|
||||
// on VM level for hypervisors which don't implement rate limiter in itself, like qemu, etc.
|
||||
// We adopt different actions, based on different inter-networking models.
|
||||
// For tcfilters as inter-networking model, we simply apply htb qdisc discipline to the virtual netpair.
|
||||
// For other inter-networking models, such as macvtap, we resort to ifb, by redirecting endpoint ingress traffic
|
||||
// to ifb egress, and then apply htb to ifb egress.
|
||||
func addTxRateLimiter(endpoint Endpoint, maxRate uint64) error {
|
||||
var netPair *NetworkInterfacePair
|
||||
var linkName string
|
||||
switch ep := endpoint.(type) {
|
||||
case *VethEndpoint, *IPVlanEndpoint, *TuntapEndpoint, *BridgedMacvlanEndpoint:
|
||||
netPair = endpoint.NetworkPair()
|
||||
switch netPair.NetInterworkingModel {
|
||||
// For those endpoints we've already used tcfilter as their inter-networking model,
|
||||
// another ifb redirect will be redundant and confused.
|
||||
case NetXConnectTCFilterModel:
|
||||
linkName = netPair.VirtIface.Name
|
||||
link, err := netlink.LinkByName(linkName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return addHTBQdisc(link.Attrs().Index, maxRate)
|
||||
case NetXConnectMacVtapModel, NetXConnectNoneModel:
|
||||
linkName = netPair.TapInterface.TAPIface.Name
|
||||
default:
|
||||
return fmt.Errorf("Unsupported inter-networking model %v for adding tx rate limiter", netPair.NetInterworkingModel)
|
||||
}
|
||||
|
||||
case *MacvtapEndpoint, *TapEndpoint:
|
||||
linkName = endpoint.Name()
|
||||
default:
|
||||
return fmt.Errorf("Unsupported endpointType %s for adding tx rate limiter", ep.Type())
|
||||
}
|
||||
|
||||
if err := endpoint.SetTxRateLimiter(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ifbIndex, err := addIFBDevice()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
link, err := netlink.LinkByName(linkName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := addIFBRedirecting(link.Attrs().Index, ifbIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return addHTBQdisc(ifbIndex, maxRate)
|
||||
}
|
||||
|
||||
func removeHTBQdisc(linkName string) error {
|
||||
link, err := netlink.LinkByName(linkName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Get link %s by name failed: %v", linkName, err)
|
||||
}
|
||||
|
||||
qdiscs, err := netlink.QdiscList(link)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, qdisc := range qdiscs {
|
||||
htb, ok := qdisc.(*netlink.Htb)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := netlink.QdiscDel(htb); err != nil {
|
||||
return fmt.Errorf("Failed to delete htb qdisc on link %s: %v", linkName, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeRxRateLimiter(endpoint Endpoint, networkNSPath string) error {
|
||||
var linkName string
|
||||
switch ep := endpoint.(type) {
|
||||
case *VethEndpoint, *IPVlanEndpoint, *TuntapEndpoint, *BridgedMacvlanEndpoint:
|
||||
netPair := endpoint.NetworkPair()
|
||||
linkName = netPair.TapInterface.TAPIface.Name
|
||||
case *MacvtapEndpoint, *TapEndpoint:
|
||||
linkName = endpoint.Name()
|
||||
default:
|
||||
return fmt.Errorf("Unsupported endpointType %s for removing rx rate limiter", ep.Type())
|
||||
}
|
||||
|
||||
if err := doNetNS(networkNSPath, func(_ ns.NetNS) error {
|
||||
return removeHTBQdisc(linkName)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeTxRateLimiter(endpoint Endpoint, networkNSPath string) error {
|
||||
var linkName string
|
||||
switch ep := endpoint.(type) {
|
||||
case *VethEndpoint, *IPVlanEndpoint, *TuntapEndpoint, *BridgedMacvlanEndpoint:
|
||||
netPair := endpoint.NetworkPair()
|
||||
switch netPair.NetInterworkingModel {
|
||||
case NetXConnectTCFilterModel:
|
||||
linkName = netPair.VirtIface.Name
|
||||
if err := doNetNS(networkNSPath, func(_ ns.NetNS) error {
|
||||
return removeHTBQdisc(linkName)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case NetXConnectMacVtapModel, NetXConnectNoneModel:
|
||||
linkName = netPair.TapInterface.TAPIface.Name
|
||||
}
|
||||
case *MacvtapEndpoint, *TapEndpoint:
|
||||
linkName = endpoint.Name()
|
||||
default:
|
||||
return fmt.Errorf("Unsupported endpointType %s for adding tx rate limiter", ep.Type())
|
||||
}
|
||||
|
||||
if err := doNetNS(networkNSPath, func(_ ns.NetNS) error {
|
||||
link, err := netlink.LinkByName(linkName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Get link %s by name failed: %v", linkName, err)
|
||||
}
|
||||
|
||||
if err := removeRedirectTCFilter(link); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := removeQdiscIngress(link); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
netHandle, err := netlink.NewHandle()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer netHandle.Delete()
|
||||
|
||||
// remove ifb interface
|
||||
ifbLink, err := netlink.LinkByName("ifb0")
|
||||
if err != nil {
|
||||
return fmt.Errorf("Get link %s by name failed: %v", linkName, err)
|
||||
}
|
||||
|
||||
if err := netHandle.LinkSetDown(ifbLink); err != nil {
|
||||
return fmt.Errorf("Could not disable ifb interface: %v", err)
|
||||
}
|
||||
|
||||
if err := netHandle.LinkDel(ifbLink); err != nil {
|
||||
return fmt.Errorf("Could not remove ifb interface: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/containernetworking/plugins/pkg/ns"
|
||||
ktu "github.com/kata-containers/kata-containers/src/runtime/pkg/katatestutils"
|
||||
vcTypes "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -284,3 +285,101 @@ func TestTcRedirectNetwork(t *testing.T) {
|
||||
err = netHandle.LinkDel(link)
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestRxRateLimiter(t *testing.T) {
|
||||
if tc.NotValid(ktu.NeedRoot()) {
|
||||
t.Skip(testDisabledAsNonRoot)
|
||||
}
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
netHandle, err := netlink.NewHandle()
|
||||
assert.NoError(err)
|
||||
defer netHandle.Delete()
|
||||
|
||||
// Create a test veth interface.
|
||||
vethName := "foo"
|
||||
veth := &netlink.Veth{LinkAttrs: netlink.LinkAttrs{Name: vethName, TxQLen: 200, MTU: 1400}, PeerName: "bar"}
|
||||
|
||||
err = netlink.LinkAdd(veth)
|
||||
assert.NoError(err)
|
||||
|
||||
endpoint, err := createVethNetworkEndpoint(1, vethName, NetXConnectTCFilterModel)
|
||||
assert.NoError(err)
|
||||
|
||||
link, err := netlink.LinkByName(vethName)
|
||||
assert.NoError(err)
|
||||
|
||||
err = netHandle.LinkSetUp(link)
|
||||
assert.NoError(err)
|
||||
|
||||
err = setupTCFiltering(endpoint, 1, true)
|
||||
assert.NoError(err)
|
||||
|
||||
// 10Mb
|
||||
maxRate := uint64(10000000)
|
||||
err = addRxRateLimiter(endpoint, maxRate)
|
||||
assert.NoError(err)
|
||||
|
||||
currentNS, err := ns.GetCurrentNS()
|
||||
assert.NoError(err)
|
||||
|
||||
err = removeRxRateLimiter(endpoint, currentNS.Path())
|
||||
assert.NoError(err)
|
||||
|
||||
err = removeTCFiltering(endpoint)
|
||||
assert.NoError(err)
|
||||
|
||||
// Remove the veth created for testing.
|
||||
err = netHandle.LinkDel(link)
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
func TestTxRateLimiter(t *testing.T) {
|
||||
if tc.NotValid(ktu.NeedRoot()) {
|
||||
t.Skip(testDisabledAsNonRoot)
|
||||
}
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
netHandle, err := netlink.NewHandle()
|
||||
assert.NoError(err)
|
||||
defer netHandle.Delete()
|
||||
|
||||
// Create a test veth interface.
|
||||
vethName := "foo"
|
||||
veth := &netlink.Veth{LinkAttrs: netlink.LinkAttrs{Name: vethName, TxQLen: 200, MTU: 1400}, PeerName: "bar"}
|
||||
|
||||
err = netlink.LinkAdd(veth)
|
||||
assert.NoError(err)
|
||||
|
||||
endpoint, err := createVethNetworkEndpoint(1, vethName, NetXConnectTCFilterModel)
|
||||
assert.NoError(err)
|
||||
|
||||
link, err := netlink.LinkByName(vethName)
|
||||
assert.NoError(err)
|
||||
|
||||
err = netHandle.LinkSetUp(link)
|
||||
assert.NoError(err)
|
||||
|
||||
err = setupTCFiltering(endpoint, 1, true)
|
||||
assert.NoError(err)
|
||||
|
||||
// 10Mb
|
||||
maxRate := uint64(10000000)
|
||||
err = addTxRateLimiter(endpoint, maxRate)
|
||||
assert.NoError(err)
|
||||
|
||||
currentNS, err := ns.GetCurrentNS()
|
||||
assert.NoError(err)
|
||||
|
||||
err = removeTxRateLimiter(endpoint, currentNS.Path())
|
||||
assert.NoError(err)
|
||||
|
||||
err = removeTCFiltering(endpoint)
|
||||
assert.NoError(err)
|
||||
|
||||
// Remove the veth created for testing.
|
||||
err = netHandle.LinkDel(link)
|
||||
assert.NoError(err)
|
||||
}
|
||||
|
||||
@@ -256,6 +256,8 @@ func (s *Sandbox) dumpConfig(ss *persistapi.SandboxState) {
|
||||
VhostUserStorePath: sconfig.HypervisorConfig.VhostUserStorePath,
|
||||
GuestHookPath: sconfig.HypervisorConfig.GuestHookPath,
|
||||
VMid: sconfig.HypervisorConfig.VMid,
|
||||
RxRateLimiterMaxRate: sconfig.HypervisorConfig.RxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: sconfig.HypervisorConfig.TxRateLimiterMaxRate,
|
||||
}
|
||||
|
||||
if sconfig.AgentType == "kata" {
|
||||
@@ -545,6 +547,8 @@ func loadSandboxConfig(id string) (*SandboxConfig, error) {
|
||||
VhostUserStorePath: hconf.VhostUserStorePath,
|
||||
GuestHookPath: hconf.GuestHookPath,
|
||||
VMid: hconf.VMid,
|
||||
RxRateLimiterMaxRate: hconf.RxRateLimiterMaxRate,
|
||||
TxRateLimiterMaxRate: hconf.TxRateLimiterMaxRate,
|
||||
}
|
||||
|
||||
if savedConf.AgentType == "kata" {
|
||||
|
||||
@@ -179,6 +179,12 @@ type HypervisorConfig struct {
|
||||
// VMid is the id of the VM that create the hypervisor if the VM is created by the factory.
|
||||
// VMid is "" if the hypervisor is not created by the factory.
|
||||
VMid string
|
||||
|
||||
// RxRateLimiterMaxRate is used to control network I/O inbound bandwidth on VM level.
|
||||
RxRateLimiterMaxRate uint64
|
||||
|
||||
// TxRateLimiterMaxRate is used to control network I/O outbound bandwidth on VM level.
|
||||
TxRateLimiterMaxRate uint64
|
||||
}
|
||||
|
||||
// KataAgentConfig is a structure storing information needed
|
||||
|
||||
@@ -231,3 +231,21 @@ func (endpoint *PhysicalEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.VendorDeviceID = s.Physical.VendorDeviceID
|
||||
}
|
||||
}
|
||||
|
||||
// unsupported
|
||||
func (endpoint *PhysicalEndpoint) GetRxRateLimiter() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (endpoint *PhysicalEndpoint) SetRxRateLimiter() error {
|
||||
return fmt.Errorf("rx rate limiter is unsupported for physical endpoint")
|
||||
}
|
||||
|
||||
// unsupported
|
||||
func (endpoint *PhysicalEndpoint) GetTxRateLimiter() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (endpoint *PhysicalEndpoint) SetTxRateLimiter() error {
|
||||
return fmt.Errorf("tx rate limiter is unsupported for physical endpoint")
|
||||
}
|
||||
|
||||
@@ -200,6 +200,12 @@ const (
|
||||
// BlockDeviceCacheNoflush is a sandbox annotation that specifies cache-related options for block devices.
|
||||
// Denotes whether flush requests for the device are ignored.
|
||||
BlockDeviceCacheNoflush = kataAnnotHypervisorPrefix + "block_device_cache_noflush"
|
||||
|
||||
// RxRateLimiterMaxRate is a sandbox annotation that specifies max rate on network I/O inbound bandwidth.
|
||||
RxRateLimiterMaxRate = kataAnnotHypervisorPrefix + "rx_rate_limiter_max_rate"
|
||||
|
||||
// TxRateLimiter is a sandbox annotation that specifies max rate on network I/O outbound bandwidth
|
||||
TxRateLimiterMaxRate = kataAnnotHypervisorPrefix + "tx_rate_limiter_max_rate"
|
||||
)
|
||||
|
||||
// Agent related annotations
|
||||
|
||||
@@ -19,17 +19,17 @@ type TokenBucket struct {
|
||||
|
||||
// The initial size of a token bucket.
|
||||
// Minimum: 0
|
||||
OneTimeBurst *int64 `json:"one_time_burst,omitempty"`
|
||||
OneTimeBurst *uint64 `json:"one_time_burst,omitempty"`
|
||||
|
||||
// The amount of milliseconds it takes for the bucket to refill.
|
||||
// Required: true
|
||||
// Minimum: 0
|
||||
RefillTime *int64 `json:"refill_time"`
|
||||
RefillTime *uint64 `json:"refill_time"`
|
||||
|
||||
// The total number of tokens this bucket can hold.
|
||||
// Required: true
|
||||
// Minimum: 0
|
||||
Size *int64 `json:"size"`
|
||||
Size *uint64 `json:"size"`
|
||||
}
|
||||
|
||||
// Validate validates this token bucket
|
||||
|
||||
@@ -615,17 +615,17 @@ definitions:
|
||||
properties:
|
||||
size:
|
||||
type: integer
|
||||
format: int64
|
||||
format: uint64
|
||||
description: The total number of tokens this bucket can hold.
|
||||
minimum: 0
|
||||
one_time_burst:
|
||||
type: integer
|
||||
format: int64
|
||||
format: uint64
|
||||
description: The initial size of a token bucket.
|
||||
minimum: 0
|
||||
refill_time:
|
||||
type: integer
|
||||
format: int64
|
||||
format: uint64
|
||||
description: The amount of milliseconds it takes for the bucket to refill.
|
||||
minimum: 0
|
||||
|
||||
|
||||
@@ -382,6 +382,10 @@ func addHypervisorConfigOverrides(ocispec specs.Spec, config *vc.SandboxConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := addHypervisporNetworkOverrides(ocispec, config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.KernelParams]; ok {
|
||||
if value != "" {
|
||||
params := vc.DeserializeParams(strings.Fields(value))
|
||||
@@ -405,15 +409,6 @@ func addHypervisorConfigOverrides(ocispec specs.Spec, config *vc.SandboxConfig)
|
||||
}
|
||||
}
|
||||
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.DisableVhostNet]; ok {
|
||||
disableVhostNet, err := strconv.ParseBool(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error parsing annotation for disable_vhost_net: Please specify boolean value 'true|false'")
|
||||
}
|
||||
|
||||
config.HypervisorConfig.DisableVhostNet = disableVhostNet
|
||||
}
|
||||
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.GuestHookPath]; ok {
|
||||
if value != "" {
|
||||
config.HypervisorConfig.GuestHookPath = value
|
||||
@@ -704,6 +699,35 @@ func addHypervisporVirtioFsOverrides(ocispec specs.Spec, sbConfig *vc.SandboxCon
|
||||
return nil
|
||||
}
|
||||
|
||||
func addHypervisporNetworkOverrides(ocispec specs.Spec, sbConfig *vc.SandboxConfig) error {
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.DisableVhostNet]; ok {
|
||||
disableVhostNet, err := strconv.ParseBool(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error parsing annotation for disable_vhost_net: Please specify boolean value 'true|false'")
|
||||
}
|
||||
|
||||
sbConfig.HypervisorConfig.DisableVhostNet = disableVhostNet
|
||||
}
|
||||
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.RxRateLimiterMaxRate]; ok {
|
||||
rxRateLimiterMaxRate, err := strconv.ParseUint(value, 10, 64)
|
||||
if err != nil || rxRateLimiterMaxRate < 0 {
|
||||
return fmt.Errorf("Error parsing annotation for rx_rate_limiter_max_rate: %v, Please specify an integer greater than or equal to 0", err)
|
||||
}
|
||||
sbConfig.HypervisorConfig.RxRateLimiterMaxRate = rxRateLimiterMaxRate
|
||||
}
|
||||
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.TxRateLimiterMaxRate]; ok {
|
||||
txRateLimiterMaxRate, err := strconv.ParseUint(value, 10, 64)
|
||||
if err != nil || txRateLimiterMaxRate < 0 {
|
||||
return fmt.Errorf("Error parsing annotation for tx_rate_limiter_max_rate: %v, Please specify an integer greater than or equal to 0", err)
|
||||
}
|
||||
sbConfig.HypervisorConfig.TxRateLimiterMaxRate = txRateLimiterMaxRate
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func addRuntimeConfigOverrides(ocispec specs.Spec, sbConfig *vc.SandboxConfig) error {
|
||||
if value, ok := ocispec.Annotations[vcAnnotations.DisableGuestSeccomp]; ok {
|
||||
disableGuestSeccomp, err := strconv.ParseBool(value)
|
||||
|
||||
@@ -791,6 +791,9 @@ func TestAddHypervisorAnnotations(t *testing.T) {
|
||||
ocispec.Annotations[vcAnnotations.HotplugVFIOOnRootBus] = "true"
|
||||
ocispec.Annotations[vcAnnotations.PCIeRootPort] = "2"
|
||||
ocispec.Annotations[vcAnnotations.EntropySource] = "/dev/urandom"
|
||||
// 10Mbit
|
||||
ocispec.Annotations[vcAnnotations.RxRateLimiterMaxRate] = "10000000"
|
||||
ocispec.Annotations[vcAnnotations.TxRateLimiterMaxRate] = "10000000"
|
||||
|
||||
addAnnotations(ocispec, &config)
|
||||
assert.Equal(config.HypervisorConfig.NumVCPUs, uint32(1))
|
||||
@@ -823,6 +826,8 @@ func TestAddHypervisorAnnotations(t *testing.T) {
|
||||
assert.Equal(config.HypervisorConfig.HotplugVFIOOnRootBus, true)
|
||||
assert.Equal(config.HypervisorConfig.PCIeRootPort, uint32(2))
|
||||
assert.Equal(config.HypervisorConfig.EntropySource, "/dev/urandom")
|
||||
assert.Equal(config.HypervisorConfig.RxRateLimiterMaxRate, uint64(10000000))
|
||||
assert.Equal(config.HypervisorConfig.TxRateLimiterMaxRate, uint64(10000000))
|
||||
|
||||
// In case an absurd large value is provided, the config value if not over-ridden
|
||||
ocispec.Annotations[vcAnnotations.DefaultVCPUs] = "655536"
|
||||
|
||||
@@ -2263,3 +2263,7 @@ func (q *qemu) check() error {
|
||||
func (q *qemu) generateSocket(id string, useVsock bool) (interface{}, error) {
|
||||
return generateVMSocket(id, useVsock, q.store.RunVMStoragePath())
|
||||
}
|
||||
|
||||
func (q *qemu) isRateLimiterBuiltin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ type TapEndpoint struct {
|
||||
EndpointProperties NetworkInfo
|
||||
EndpointType EndpointType
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
// Properties returns the properties of the tap interface.
|
||||
@@ -207,3 +209,21 @@ func (endpoint *TapEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.TapInterface = *tapif
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *TapEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *TapEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *TapEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *TapEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -23,6 +23,8 @@ type TuntapEndpoint struct {
|
||||
EndpointProperties NetworkInfo
|
||||
EndpointType EndpointType
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
// Properties returns the properties of the tap interface.
|
||||
@@ -212,3 +214,21 @@ func (endpoint *TuntapEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.TuntapInterface = *tuntapif
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *TuntapEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *TuntapEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *TuntapEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *TuntapEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -29,6 +29,9 @@ const MaxSocketPathLen = 107
|
||||
// VHostVSockDevicePath path to vhost-vsock device
|
||||
var VHostVSockDevicePath = "/dev/vhost-vsock"
|
||||
|
||||
// sysModuleDir is the directory where system modules locate.
|
||||
var sysModuleDir = "/sys/module"
|
||||
|
||||
// FileCopy copys files from srcPath to dstPath
|
||||
func FileCopy(srcPath, dstPath string) error {
|
||||
if srcPath == "" {
|
||||
@@ -235,6 +238,25 @@ func SupportsVsocks() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// SupportsIfb returns true if ifb are supported, otherwise false
|
||||
func SupportsIfb() (bool, error) {
|
||||
ifbModule := "ifb"
|
||||
// First, check to see if the ifb module is already loaded
|
||||
path := filepath.Join(sysModuleDir, ifbModule)
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Try to load the ifb module.
|
||||
// When inserting the ifb module, tell it the number of virtual interfaces you need, here, it's zero.
|
||||
// The default is 2.
|
||||
cmd := exec.Command("modprobe", ifbModule, "numifbs=0")
|
||||
if output, err := cmd.CombinedOutput(); err != nil {
|
||||
return false, fmt.Errorf("modprobe insert ifb module failed: %s", string(output))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// StartCmd pointer to a function to start a command.
|
||||
// Defined this way to allow mock testing.
|
||||
var StartCmd = func(c *exec.Cmd) error {
|
||||
|
||||
@@ -18,6 +18,8 @@ type VethEndpoint struct {
|
||||
EndpointProperties NetworkInfo
|
||||
EndpointType EndpointType
|
||||
PCIAddr string
|
||||
RxRateLimiter bool
|
||||
TxRateLimiter bool
|
||||
}
|
||||
|
||||
func createVethNetworkEndpoint(idx int, ifName string, interworkingModel NetInterworkingModel) (*VethEndpoint, error) {
|
||||
@@ -162,3 +164,21 @@ func (endpoint *VethEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.NetPair = *netpair
|
||||
}
|
||||
}
|
||||
|
||||
func (endpoint *VethEndpoint) GetRxRateLimiter() bool {
|
||||
return endpoint.RxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *VethEndpoint) SetRxRateLimiter() error {
|
||||
endpoint.RxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (endpoint *VethEndpoint) GetTxRateLimiter() bool {
|
||||
return endpoint.TxRateLimiter
|
||||
}
|
||||
|
||||
func (endpoint *VethEndpoint) SetTxRateLimiter() error {
|
||||
endpoint.TxRateLimiter = true
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -169,3 +169,21 @@ func (endpoint *VhostUserEndpoint) load(s persistapi.NetworkEndpoint) {
|
||||
endpoint.PCIAddr = s.VhostUser.PCIAddr
|
||||
}
|
||||
}
|
||||
|
||||
// unsupported
|
||||
func (endpoint *VhostUserEndpoint) GetRxRateLimiter() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (endpoint *VhostUserEndpoint) SetRxRateLimiter() error {
|
||||
return fmt.Errorf("rx rate limiter is unsupported for vhost user endpoint")
|
||||
}
|
||||
|
||||
// unsupported
|
||||
func (endpoint *VhostUserEndpoint) GetTxRateLimiter() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (endpoint *VhostUserEndpoint) SetTxRateLimiter() error {
|
||||
return fmt.Errorf("tx rate limiter is unsupported for vhost user endpoint")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user