kata 2.0: delete use_vsock option and proxy abstraction

With kata containers moving to 2.0, (hybrid-)vsock will be the only
way to directly communicate between host and agent.
And kata-proxy as additional component to handle the multiplexing on
serial port is also no longer needed.
Cleaning up related unit tests, and also add another mock socket type
`MockHybridVSock` to deal with ttrpc-based hybrid-vsock mock server.

Fixes: #389

Signed-off-by: Penny Zheng penny.zheng@arm.com
This commit is contained in:
Penny Zheng
2020-07-13 02:21:10 +00:00
parent c052e46c66
commit 1099a28830
68 changed files with 427 additions and 2374 deletions

View File

@@ -18,22 +18,21 @@ import (
"strings"
"time"
"github.com/hashicorp/yamux"
"github.com/mdlayher/vsock"
// opentracing "github.com/opentracing/opentracing-go"
// opentracing "github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
// "google.golang.org/grpc"
// "google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
agentgrpc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/grpc"
"github.com/containerd/ttrpc"
agentgrpc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/grpc"
)
const (
UnixSocketScheme = "unix"
VSockSocketScheme = "vsock"
HybridVSockScheme = "hvsock"
VSockSocketScheme = "vsock"
HybridVSockScheme = "hvsock"
MockHybridVSockScheme = "mock"
)
var defaultDialTimeout = 15 * time.Second
@@ -52,36 +51,8 @@ var agentClientLog = logrus.WithFields(agentClientFields)
// AgentClient is an agent gRPC client connection wrapper for agentgrpc.AgentServiceClient
type AgentClient struct {
AgentServiceClient agentgrpc.AgentServiceService
HealthClient agentgrpc.HealthService
conn *ttrpc.Client
}
type yamuxSessionStream struct {
net.Conn
session *yamux.Session
}
func (y *yamuxSessionStream) Close() error {
waitCh := y.session.CloseChan()
timeout := time.NewTimer(defaultCloseTimeout)
if err := y.Conn.Close(); err != nil {
return err
}
if err := y.session.Close(); err != nil {
return err
}
// block until session is really closed
select {
case <-waitCh:
timeout.Stop()
case <-timeout.C:
return fmt.Errorf("timeout waiting for session close")
}
return nil
HealthClient agentgrpc.HealthService
conn *ttrpc.Client
}
type dialer func(string, time.Duration) (net.Conn, error)
@@ -89,56 +60,55 @@ type dialer func(string, time.Duration) (net.Conn, error)
// NewAgentClient creates a new agent gRPC client and handles both unix and vsock addresses.
//
// Supported sock address formats are:
// - unix://<unix socket path>
// - vsock://<cid>:<port>
// - <unix socket path>
// - hvsock://<path>:<port>. Firecracker implements the virtio-vsock device
// model, and mediates communication between AF_UNIX sockets (on the host end)
// and AF_VSOCK sockets (on the guest end).
func NewAgentClient(ctx context.Context, sock string, enableYamux bool) (*AgentClient, error) {
// - mock://<path>. just for test use.
func NewAgentClient(ctx context.Context, sock string) (*AgentClient, error) {
grpcAddr, parsedAddr, err := parse(sock)
if err != nil {
return nil, err
}
var conn net.Conn
var d dialer
d = agentDialer(parsedAddr, enableYamux)
conn, err = d(grpcAddr, defaultDialTimeout)
if err != nil {
return nil, err
}
/*
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
var tracer opentracing.Tracer
span := opentracing.SpanFromContext(ctx)
// If the context contains a trace span, trace all client comms
if span != nil {
tracer = span.Tracer()
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
dialOpts = append(dialOpts,
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
}
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
var conn net.Conn
var d dialer
d = agentDialer(parsedAddr)
conn, err = d(grpcAddr, defaultDialTimeout)
if err != nil {
return nil, err
}
*/
/*
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
var tracer opentracing.Tracer
span := opentracing.SpanFromContext(ctx)
// If the context contains a trace span, trace all client comms
if span != nil {
tracer = span.Tracer()
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
dialOpts = append(dialOpts,
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
}
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
if err != nil {
return nil, err
}
*/
client := ttrpc.NewClient(conn)
return &AgentClient{
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
HealthClient: agentgrpc.NewHealthClient(client),
conn: client,
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
HealthClient: agentgrpc.NewHealthClient(client),
conn: client,
}, nil
}
@@ -176,17 +146,6 @@ func parse(sock string) (string, *url.URL, error) {
return "", nil, grpcStatus.Errorf(codes.InvalidArgument, "Invalid vsock port: %s", sock)
}
grpcAddr = VSockSocketScheme + ":" + addr.Host
case UnixSocketScheme:
fallthrough
case "":
if (addr.Host == "" && addr.Path == "") || addr.Port() != "" {
return "", nil, grpcStatus.Errorf(codes.InvalidArgument, "Invalid unix scheme: %s", sock)
}
if addr.Host == "" {
grpcAddr = UnixSocketScheme + ":///" + addr.Path
} else {
grpcAddr = UnixSocketScheme + ":///" + addr.Host + "/" + addr.Path
}
case HybridVSockScheme:
if addr.Path == "" {
return "", nil, grpcStatus.Errorf(codes.InvalidArgument, "Invalid hybrid vsock scheme: %s", sock)
@@ -202,6 +161,13 @@ func parse(sock string) (string, *url.URL, error) {
}
hybridVSockPort = uint32(port)
grpcAddr = HybridVSockScheme + ":" + hvsocket[0]
// just for tests use.
case MockHybridVSockScheme:
if addr.Path == "" {
return "", nil, grpcStatus.Errorf(codes.InvalidArgument, "Invalid mock hybrid vsock scheme: %s", sock)
}
// e.g. mock:/tmp/socket
grpcAddr = MockHybridVSockScheme + ":" + addr.Path
default:
return "", nil, grpcStatus.Errorf(codes.InvalidArgument, "Invalid scheme: %s", sock)
}
@@ -209,94 +175,17 @@ func parse(sock string) (string, *url.URL, error) {
return grpcAddr, addr, nil
}
// This function is meant to run in a go routine since it will send ping
// commands every second. It behaves as a heartbeat to maintain a proper
// communication state with the Yamux server in the agent.
func heartBeat(session *yamux.Session) {
if session == nil {
return
}
for {
if session.IsClosed() {
break
}
session.Ping()
// 1 Hz heartbeat
time.Sleep(time.Second)
}
}
func agentDialer(addr *url.URL, enableYamux bool) dialer {
var d dialer
func agentDialer(addr *url.URL) dialer {
switch addr.Scheme {
case VSockSocketScheme:
d = vsockDialer
return vsockDialer
case HybridVSockScheme:
d = HybridVSockDialer
case UnixSocketScheme:
fallthrough
return HybridVSockDialer
case MockHybridVSockScheme:
return MockHybridVSockDialer
default:
d = unixDialer
return nil
}
if !enableYamux {
return d
}
// yamux dialer
return func(sock string, timeout time.Duration) (net.Conn, error) {
conn, err := d(sock, timeout)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
conn.Close()
}
}()
var session *yamux.Session
sessionConfig := yamux.DefaultConfig()
// Disable keepAlive since we don't know how much time a container can be paused
sessionConfig.EnableKeepAlive = false
sessionConfig.ConnectionWriteTimeout = time.Second
session, err = yamux.Client(conn, sessionConfig)
if err != nil {
return nil, err
}
// Start the heartbeat in a separate go routine
go heartBeat(session)
var stream net.Conn
stream, err = session.Open()
if err != nil {
return nil, err
}
y := &yamuxSessionStream{
Conn: stream.(net.Conn),
session: session,
}
return y, nil
}
}
func unixDialer(sock string, timeout time.Duration) (net.Conn, error) {
if strings.HasPrefix(sock, "unix:") {
sock = strings.Trim(sock, "unix:")
}
dialFunc := func() (net.Conn, error) {
return net.DialTimeout("unix", sock, timeout)
}
timeoutErr := grpcStatus.Errorf(codes.DeadlineExceeded, "timed out connecting to unix socket %s", sock)
return commonDialer(timeout, dialFunc, timeoutErr)
}
func parseGrpcVsockAddr(sock string) (uint32, uint32, error) {
@@ -471,3 +360,17 @@ func HybridVSockDialer(sock string, timeout time.Duration) (net.Conn, error) {
timeoutErr := grpcStatus.Errorf(codes.DeadlineExceeded, "timed out connecting to hybrid vsocket %s", sock)
return commonDialer(timeout, dialFunc, timeoutErr)
}
// just for tests use.
func MockHybridVSockDialer(sock string, timeout time.Duration) (net.Conn, error) {
if strings.HasPrefix(sock, "mock:") {
sock = strings.TrimPrefix(sock, "mock:")
}
dialFunc := func() (net.Conn, error) {
return net.DialTimeout("unix", sock, timeout)
}
timeoutErr := grpcStatus.Errorf(codes.DeadlineExceeded, "timed out connecting to mock hybrid vsocket %s", sock)
return commonDialer(timeout, dialFunc, timeoutErr)
}