mirror of
https://github.com/aljazceru/kata-containers.git
synced 2025-12-27 11:04:21 +01:00
This commit adds a new gRPC function Status to CacheService. VMCache server will reply the status of VMCache server. Factory destroy will call gRPC Status to get the status of VMCache server and output it when VMCache is enabled. Fixes: #1395 Signed-off-by: Hui Zhu <teawater@hyper.sh>
337 lines
8.9 KiB
Go
337 lines
8.9 KiB
Go
// Copyright (c) 2018 HyperHQ Inc.
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/types"
|
|
pb "github.com/kata-containers/runtime/protocols/cache"
|
|
vc "github.com/kata-containers/runtime/virtcontainers"
|
|
vf "github.com/kata-containers/runtime/virtcontainers/factory"
|
|
"github.com/kata-containers/runtime/virtcontainers/pkg/oci"
|
|
"github.com/pkg/errors"
|
|
"github.com/urfave/cli"
|
|
"golang.org/x/sys/unix"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var factorySubCmds = []cli.Command{
|
|
initFactoryCommand,
|
|
destroyFactoryCommand,
|
|
statusFactoryCommand,
|
|
}
|
|
|
|
var factoryCLICommand = cli.Command{
|
|
Name: "factory",
|
|
Usage: "manage vm factory",
|
|
Subcommands: factorySubCmds,
|
|
Action: func(context *cli.Context) {
|
|
cli.ShowSubcommandHelp(context)
|
|
},
|
|
}
|
|
|
|
type cacheServer struct {
|
|
rpc *grpc.Server
|
|
factory vc.Factory
|
|
done chan struct{}
|
|
}
|
|
|
|
var jsonVMConfig *pb.GrpcVMConfig
|
|
|
|
// Config requests base factory config and convert it to gRPC protocol.
|
|
func (s *cacheServer) Config(ctx context.Context, empty *types.Empty) (*pb.GrpcVMConfig, error) {
|
|
if jsonVMConfig == nil {
|
|
config := s.factory.Config()
|
|
|
|
var err error
|
|
jsonVMConfig, err = config.ToGrpc()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return jsonVMConfig, nil
|
|
}
|
|
|
|
// GetBaseVM requests a paused VM and convert it to gRPC protocol.
|
|
func (s *cacheServer) GetBaseVM(ctx context.Context, empty *types.Empty) (*pb.GrpcVM, error) {
|
|
config := s.factory.Config()
|
|
|
|
vm, err := s.factory.GetBaseVM(ctx, config)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to GetBaseVM")
|
|
}
|
|
|
|
return vm.ToGrpc(config)
|
|
}
|
|
|
|
func (s *cacheServer) quit() {
|
|
s.rpc.GracefulStop()
|
|
close(s.done)
|
|
}
|
|
|
|
// Quit will stop VMCache server after 1 second.
|
|
func (s *cacheServer) Quit(ctx context.Context, empty *types.Empty) (*types.Empty, error) {
|
|
go func() {
|
|
kataLog.Info("VM cache server will stop after 1 second")
|
|
time.Sleep(time.Second)
|
|
s.quit()
|
|
}()
|
|
return nil, nil
|
|
}
|
|
|
|
func (s *cacheServer) Status(ctx context.Context, empty *types.Empty) (*pb.GrpcStatus, error) {
|
|
stat := pb.GrpcStatus{
|
|
Pid: int64(os.Getpid()),
|
|
Vmstatus: s.factory.GetVMStatus(),
|
|
}
|
|
return &stat, nil
|
|
}
|
|
|
|
func getUnixListener(path string) (net.Listener, error) {
|
|
err := os.MkdirAll(filepath.Dir(path), 0755)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = os.Stat(path)
|
|
if err == nil {
|
|
return nil, fmt.Errorf("%s already exist. Please stop running VMCache server and remove %s", path, path)
|
|
} else if !os.IsNotExist(err) {
|
|
return nil, err
|
|
}
|
|
l, err := net.Listen("unix", path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err = os.Chmod(path, 0600); err != nil {
|
|
l.Close()
|
|
return nil, err
|
|
}
|
|
return l, nil
|
|
}
|
|
|
|
var handledSignals = []os.Signal{
|
|
syscall.SIGTERM,
|
|
syscall.SIGINT,
|
|
syscall.SIGPIPE,
|
|
}
|
|
|
|
func handleSignals(s *cacheServer, signals chan os.Signal) {
|
|
s.done = make(chan struct{}, 1)
|
|
go func() {
|
|
for {
|
|
sig := <-signals
|
|
kataLog.WithField("signal", sig).Debug("received signal")
|
|
switch sig {
|
|
case unix.SIGPIPE:
|
|
continue
|
|
default:
|
|
s.quit()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
var initFactoryCommand = cli.Command{
|
|
Name: "init",
|
|
Usage: "initialize a VM factory based on kata-runtime configuration",
|
|
Action: func(c *cli.Context) error {
|
|
ctx, err := cliContextToContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
runtimeConfig, ok := c.App.Metadata["runtimeConfig"].(oci.RuntimeConfig)
|
|
if !ok {
|
|
return errors.New("invalid runtime config")
|
|
}
|
|
|
|
if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
|
|
factoryConfig := vf.Config{
|
|
Template: runtimeConfig.FactoryConfig.Template,
|
|
Cache: runtimeConfig.FactoryConfig.VMCacheNumber,
|
|
VMCache: true,
|
|
VMConfig: vc.VMConfig{
|
|
HypervisorType: runtimeConfig.HypervisorType,
|
|
HypervisorConfig: runtimeConfig.HypervisorConfig,
|
|
AgentType: runtimeConfig.AgentType,
|
|
AgentConfig: runtimeConfig.AgentConfig,
|
|
ProxyType: runtimeConfig.ProxyType,
|
|
ProxyConfig: runtimeConfig.ProxyConfig,
|
|
},
|
|
}
|
|
f, err := vf.NewFactory(ctx, factoryConfig, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.CloseFactory(ctx)
|
|
|
|
s := &cacheServer{
|
|
rpc: grpc.NewServer(),
|
|
factory: f,
|
|
}
|
|
pb.RegisterCacheServiceServer(s.rpc, s)
|
|
|
|
l, err := getUnixListener(runtimeConfig.FactoryConfig.VMCacheEndpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer l.Close()
|
|
|
|
signals := make(chan os.Signal, 8)
|
|
handleSignals(s, signals)
|
|
signal.Notify(signals, handledSignals...)
|
|
|
|
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server start")
|
|
s.rpc.Serve(l)
|
|
|
|
<-s.done
|
|
|
|
kataLog.WithField("endpoint", runtimeConfig.FactoryConfig.VMCacheEndpoint).Info("VM cache server stop")
|
|
return nil
|
|
}
|
|
|
|
if runtimeConfig.FactoryConfig.Template {
|
|
factoryConfig := vf.Config{
|
|
Template: true,
|
|
VMConfig: vc.VMConfig{
|
|
HypervisorType: runtimeConfig.HypervisorType,
|
|
HypervisorConfig: runtimeConfig.HypervisorConfig,
|
|
AgentType: runtimeConfig.AgentType,
|
|
AgentConfig: runtimeConfig.AgentConfig,
|
|
ProxyType: runtimeConfig.ProxyType,
|
|
},
|
|
}
|
|
kataLog.WithField("factory", factoryConfig).Info("create vm factory")
|
|
_, err := vf.NewFactory(ctx, factoryConfig, false)
|
|
if err != nil {
|
|
kataLog.WithError(err).Error("create vm factory failed")
|
|
return err
|
|
}
|
|
fmt.Fprintln(defaultOutputFile, "vm factory initialized")
|
|
} else {
|
|
kataLog.Error("vm factory is not enabled")
|
|
fmt.Fprintln(defaultOutputFile, "vm factory is not enabled")
|
|
}
|
|
|
|
return nil
|
|
},
|
|
}
|
|
|
|
var destroyFactoryCommand = cli.Command{
|
|
Name: "destroy",
|
|
Usage: "destroy the VM factory",
|
|
Action: func(c *cli.Context) error {
|
|
ctx, err := cliContextToContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
runtimeConfig, ok := c.App.Metadata["runtimeConfig"].(oci.RuntimeConfig)
|
|
if !ok {
|
|
return errors.New("invalid runtime config")
|
|
}
|
|
|
|
if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
|
|
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", runtimeConfig.FactoryConfig.VMCacheEndpoint), grpc.WithInsecure())
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to connect %q", runtimeConfig.FactoryConfig.VMCacheEndpoint)
|
|
}
|
|
defer conn.Close()
|
|
_, err = pb.NewCacheServiceClient(conn).Quit(ctx, &types.Empty{})
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to call gRPC Quit")
|
|
}
|
|
// Wait VMCache server stop
|
|
time.Sleep(time.Second)
|
|
} else if runtimeConfig.FactoryConfig.Template {
|
|
factoryConfig := vf.Config{
|
|
Template: true,
|
|
VMConfig: vc.VMConfig{
|
|
HypervisorType: runtimeConfig.HypervisorType,
|
|
HypervisorConfig: runtimeConfig.HypervisorConfig,
|
|
AgentType: runtimeConfig.AgentType,
|
|
AgentConfig: runtimeConfig.AgentConfig,
|
|
},
|
|
}
|
|
kataLog.WithField("factory", factoryConfig).Info("load vm factory")
|
|
f, err := vf.NewFactory(ctx, factoryConfig, true)
|
|
if err != nil {
|
|
kataLog.WithError(err).Error("load vm factory failed")
|
|
// ignore error
|
|
} else {
|
|
f.CloseFactory(ctx)
|
|
}
|
|
}
|
|
fmt.Fprintln(defaultOutputFile, "vm factory destroyed")
|
|
return nil
|
|
},
|
|
}
|
|
|
|
var statusFactoryCommand = cli.Command{
|
|
Name: "status",
|
|
Usage: "query the status of VM factory",
|
|
Action: func(c *cli.Context) error {
|
|
ctx, err := cliContextToContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
runtimeConfig, ok := c.App.Metadata["runtimeConfig"].(oci.RuntimeConfig)
|
|
if !ok {
|
|
return errors.New("invalid runtime config")
|
|
}
|
|
|
|
if runtimeConfig.FactoryConfig.VMCacheNumber > 0 {
|
|
conn, err := grpc.Dial(fmt.Sprintf("unix://%s", runtimeConfig.FactoryConfig.VMCacheEndpoint), grpc.WithInsecure())
|
|
if err != nil {
|
|
fmt.Fprintln(defaultOutputFile, errors.Wrapf(err, "failed to connect %q", runtimeConfig.FactoryConfig.VMCacheEndpoint))
|
|
} else {
|
|
defer conn.Close()
|
|
status, err := pb.NewCacheServiceClient(conn).Status(ctx, &types.Empty{})
|
|
if err != nil {
|
|
fmt.Fprintln(defaultOutputFile, errors.Wrapf(err, "failed to call gRPC Status\n"))
|
|
} else {
|
|
fmt.Fprintf(defaultOutputFile, "VM cache server pid = %d\n", status.Pid)
|
|
for _, vs := range status.Vmstatus {
|
|
fmt.Fprintf(defaultOutputFile, "VM pid = %d Cpu = %d Memory = %dMiB\n", vs.Pid, vs.Cpu, vs.Memory)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if runtimeConfig.FactoryConfig.Template {
|
|
factoryConfig := vf.Config{
|
|
Template: true,
|
|
VMConfig: vc.VMConfig{
|
|
HypervisorType: runtimeConfig.HypervisorType,
|
|
HypervisorConfig: runtimeConfig.HypervisorConfig,
|
|
AgentType: runtimeConfig.AgentType,
|
|
AgentConfig: runtimeConfig.AgentConfig,
|
|
},
|
|
}
|
|
kataLog.WithField("factory", factoryConfig).Info("load vm factory")
|
|
_, err := vf.NewFactory(ctx, factoryConfig, true)
|
|
if err != nil {
|
|
fmt.Fprintln(defaultOutputFile, "vm factory is off")
|
|
} else {
|
|
fmt.Fprintln(defaultOutputFile, "vm factory is on")
|
|
}
|
|
} else {
|
|
fmt.Fprintln(defaultOutputFile, "vm factory not enabled")
|
|
}
|
|
return nil
|
|
},
|
|
}
|