Merge pull request #1969 from liubin/feature/1968-pass-span-context-to-agent

Pass span context from runtime to agent to get a full trace #1968
This commit is contained in:
Fupan Li
2021-07-03 09:31:02 +08:00
committed by GitHub
11 changed files with 253 additions and 79 deletions

View File

@@ -163,7 +163,7 @@ type agent interface {
configure(ctx context.Context, h hypervisor, id, sharePath string, config KataAgentConfig) error
// configureFromGrpc will update agent settings based on provided arguments which from Grpc
configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error
configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error
// reseedRNG will reseed the guest random number generator
reseedRNG(ctx context.Context, data []byte) error

View File

@@ -32,6 +32,7 @@ import (
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/label"
otelLabel "go.opentelemetry.io/otel/label"
otelTrace "go.opentelemetry.io/otel/trace"
"github.com/gogo/protobuf/proto"
@@ -373,17 +374,25 @@ func (k *kataAgent) capabilities() types.Capabilities {
return caps
}
func (k *kataAgent) internalConfigure(h hypervisor, id string, config KataAgentConfig) error {
func (k *kataAgent) internalConfigure(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error {
span, _ := k.trace(ctx, "configure")
defer span.End()
var err error
if k.vmSocket, err = h.generateSocket(id); err != nil {
return err
}
k.keepConn = config.LongLiveConn
span.SetAttributes(otelLabel.Any("socket", k.vmSocket))
return nil
}
func (k *kataAgent) setupSandboxBindMounts(sandbox *Sandbox) (err error) {
func (k *kataAgent) setupSandboxBindMounts(ctx context.Context, sandbox *Sandbox) (err error) {
span, ctx := k.trace(ctx, "setupSandboxBindMounts")
defer span.End()
if len(sandbox.config.SandboxBindMounts) == 0 {
return nil
}
@@ -412,13 +421,13 @@ func (k *kataAgent) setupSandboxBindMounts(sandbox *Sandbox) (err error) {
for _, m := range sandbox.config.SandboxBindMounts {
mountDest := filepath.Join(sandboxMountDir, filepath.Base(m))
// bind-mount each sandbox mount that's defined into the sandbox mounts dir
if err := bindMount(context.Background(), m, mountDest, true, "private"); err != nil {
if err := bindMount(ctx, m, mountDest, true, "private"); err != nil {
return fmt.Errorf("Mounting sandbox directory: %v to %v: %w", m, mountDest, err)
}
mountedList = append(mountedList, mountDest)
mountDest = filepath.Join(sandboxShareDir, filepath.Base(m))
if err := remountRo(context.Background(), mountDest); err != nil {
if err := remountRo(ctx, mountDest); err != nil {
return fmt.Errorf("remount sandbox directory: %v to %v: %w", m, mountDest, err)
}
@@ -454,7 +463,10 @@ func (k *kataAgent) cleanupSandboxBindMounts(sandbox *Sandbox) error {
}
func (k *kataAgent) configure(ctx context.Context, h hypervisor, id, sharePath string, config KataAgentConfig) error {
err := k.internalConfigure(h, id, config)
span, ctx := k.trace(ctx, "configure")
defer span.End()
err := k.internalConfigure(ctx, h, id, config)
if err != nil {
return err
}
@@ -495,11 +507,14 @@ func (k *kataAgent) configure(ctx context.Context, h hypervisor, id, sharePath s
return h.addDevice(ctx, sharedVolume, fsDev)
}
func (k *kataAgent) configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error {
return k.internalConfigure(h, id, config)
func (k *kataAgent) configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error {
return k.internalConfigure(ctx, h, id, config)
}
func (k *kataAgent) setupSharedPath(ctx context.Context, sandbox *Sandbox) (err error) {
span, ctx := k.trace(ctx, "setupSharedPath")
defer span.End()
// create shared path structure
sharePath := getSharePath(sandbox.id)
mountPath := getMountPath(sandbox.id)
@@ -523,7 +538,7 @@ func (k *kataAgent) setupSharedPath(ctx context.Context, sandbox *Sandbox) (err
}()
// Setup sandbox bindmounts, if specified:
if err = k.setupSandboxBindMounts(sandbox); err != nil {
if err = k.setupSandboxBindMounts(ctx, sandbox); err != nil {
return err
}
@@ -2019,18 +2034,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
}
}
func (k *kataAgent) getReqContext(reqName string) (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {
newCtx = ctx
switch reqName {
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
// Wait and GetOOMEvent have no timeout
case grpcCheckRequest:
ctx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
default:
ctx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
}
return ctx, cancel
return newCtx, cancel
}
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
@@ -2049,7 +2064,7 @@ func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (inter
return nil, errors.New("Invalid request type")
}
message := request.(proto.Message)
ctx, cancel := k.getReqContext(msgName)
ctx, cancel := k.getReqContext(spanCtx, msgName)
if cancel != nil {
defer cancel()
}

View File

@@ -1279,7 +1279,7 @@ func TestSandboxBindMount(t *testing.T) {
defer syscall.Unmount(sharePath, syscall.MNT_DETACH|UmountNoFollow)
// Test the function. We expect it to succeed and for the mount to exist
err = k.setupSandboxBindMounts(sandbox)
err = k.setupSandboxBindMounts(context.Background(), sandbox)
assert.NoError(err)
// Test the cleanup function. We expect it to succeed for the mount to be removed.
@@ -1303,9 +1303,9 @@ func TestSandboxBindMount(t *testing.T) {
// We expect cleanup to fail on the first time, since it cannot remove the sandbox-bindmount directory because
// there are leftover mounts. If we run it a second time, however, it should succeed since it'll remove the
// second set of mounts:
err = k.setupSandboxBindMounts(sandbox)
err = k.setupSandboxBindMounts(context.Background(), sandbox)
assert.NoError(err)
err = k.setupSandboxBindMounts(sandbox)
err = k.setupSandboxBindMounts(context.Background(), sandbox)
assert.NoError(err)
// Test the cleanup function. We expect it to succeed for the mount to be removed.
err = k.cleanupSandboxBindMounts(sandbox)
@@ -1317,7 +1317,7 @@ func TestSandboxBindMount(t *testing.T) {
// Now, let's setup the sandbox bindmount to fail, and verify that no mounts are left behind
//
sandbox.config.SandboxBindMounts = append(sandbox.config.SandboxBindMounts, "oh-nos")
err = k.setupSandboxBindMounts(sandbox)
err = k.setupSandboxBindMounts(context.Background(), sandbox)
assert.Error(err)
// Verify there aren't any mounts left behind
stat = syscall.Stat_t{}

View File

@@ -176,7 +176,7 @@ func (n *mockAgent) configure(ctx context.Context, h hypervisor, id, sharePath s
return nil
}
func (n *mockAgent) configureFromGrpc(h hypervisor, id string, config KataAgentConfig) error {
func (n *mockAgent) configureFromGrpc(ctx context.Context, h hypervisor, id string, config KataAgentConfig) error {
return nil
}

View File

@@ -18,6 +18,7 @@ import (
merr "github.com/hashicorp/go-multierror"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
"github.com/sirupsen/logrus"
otelLabel "go.opentelemetry.io/otel/label"
)
// DefaultShmSize is the default shm size to be used in case host
@@ -258,11 +259,13 @@ func moveMount(ctx context.Context, source, destination string) error {
func bindMount(ctx context.Context, source, destination string, readonly bool, pgtypes string) error {
span, _ := trace(ctx, "bindMount")
defer span.End()
span.SetAttributes(otelLabel.String("source", source), otelLabel.String("destination", destination))
absSource, destination, err := evalMountPath(source, destination)
if err != nil {
return err
}
span.SetAttributes(otelLabel.String("source_after_eval", absSource))
if err := syscall.Mount(absSource, destination, "bind", syscall.MS_BIND, ""); err != nil {
return fmt.Errorf("Could not bind mount %v to %v: %v", absSource, destination, err)
@@ -291,10 +294,15 @@ func bindMount(ctx context.Context, source, destination string, readonly bool, p
// The mountflags should match the values used in the original mount() call,
// except for those parameters that you are trying to change.
func remount(ctx context.Context, mountflags uintptr, src string) error {
span, _ := trace(ctx, "remount")
defer span.End()
span.SetAttributes(otelLabel.String("source", src))
absSrc, err := filepath.EvalSymlinks(src)
if err != nil {
return fmt.Errorf("Could not resolve symlink for %s", src)
}
span.SetAttributes(otelLabel.String("source_after_eval", absSrc))
if err := syscall.Mount(absSrc, absSrc, "", syscall.MS_REMOUNT|mountflags, ""); err != nil {
return fmt.Errorf("remount %s failed: %v", absSrc, err)
@@ -353,6 +361,7 @@ func isSymlink(path string) bool {
func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) error {
span, _ := trace(ctx, "bindUnmountContainerRootfs")
defer span.End()
span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("container_id", cID))
rootfsDest := filepath.Join(sharedDir, cID, rootfsDir)
if isSymlink(filepath.Join(sharedDir, cID)) || isSymlink(rootfsDest) {
@@ -375,6 +384,7 @@ func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) erro
func bindUnmountAllRootfs(ctx context.Context, sharedDir string, sandbox *Sandbox) error {
span, ctx := trace(ctx, "bindUnmountAllRootfs")
defer span.End()
span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("sandbox_id", sandbox.id))
var errors *merr.Error
for _, c := range sandbox.containers {

View File

@@ -20,7 +20,11 @@ import (
"github.com/mdlayher/vsock"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
otelLabel "go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcStatus "google.golang.org/grpc/status"
"github.com/containerd/ttrpc"
@@ -80,32 +84,8 @@ func NewAgentClient(ctx context.Context, sock string, timeout uint32) (*AgentCli
if err != nil {
return nil, err
}
/*
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
dialOpts = append(dialOpts, grpc.WithDialer(agentDialer(parsedAddr, enableYamux)))
var tracer opentracing.Tracer
span := opentracing.SpanFromContext(ctx)
// If the context contains a trace span, trace all client comms
if span != nil {
tracer = span.Tracer()
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
dialOpts = append(dialOpts,
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, grpcAddr, dialOpts...)
if err != nil {
return nil, err
}
*/
client := ttrpc.NewClient(conn)
client := ttrpc.NewClient(conn, ttrpc.WithUnaryClientInterceptor(TraceUnaryClientInterceptor()))
return &AgentClient{
AgentServiceClient: agentgrpc.NewAgentServiceClient(client),
@@ -119,6 +99,89 @@ func (c *AgentClient) Close() error {
return c.conn.Close()
}
func TraceUnaryClientInterceptor() ttrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
req *ttrpc.Request,
resp *ttrpc.Response,
ci *ttrpc.UnaryClientInfo,
invoker ttrpc.Invoker,
) error {
requestMetadata := make(ttrpc.MD)
tracer := otel.Tracer("kata")
var span trace.Span
ctx, span = tracer.Start(
ctx,
fmt.Sprintf("ttrpc.%s", req.Method),
trace.WithSpanKind(trace.SpanKindClient),
)
defer span.End()
inject(ctx, &requestMetadata)
ctx = ttrpc.WithMetadata(ctx, requestMetadata)
setRequest(req, &requestMetadata)
err := invoker(ctx, req, resp)
if err != nil {
span.SetAttributes(otelLabel.Key("RPC_ERROR").Bool(true))
}
// err can be nil, that will return an OK response code
if status, _ := status.FromError(err); status != nil {
span.SetAttributes(otelLabel.Key("RPC_CODE").Uint((uint)(status.Code())))
span.SetAttributes(otelLabel.Key("RPC_MESSAGE").String(status.Message()))
}
return err
}
}
type metadataSupplier struct {
metadata *ttrpc.MD
}
func (s *metadataSupplier) Get(key string) string {
values, ok := s.metadata.Get(key)
if !ok {
return ""
}
return values[0]
}
func (s *metadataSupplier) Set(key string, value string) {
s.metadata.Set(key, value)
}
func inject(ctx context.Context, metadata *ttrpc.MD) {
otel.GetTextMapPropagator().Inject(ctx, &metadataSupplier{
metadata: metadata,
})
}
func setRequest(req *ttrpc.Request, md *ttrpc.MD) {
newMD := make([]*ttrpc.KeyValue, 0)
for _, kv := range req.Metadata {
// not found in md, means that we can copy old kv
// otherwise, we will use the values in md to overwrite it
if _, found := md.Get(kv.Key); !found {
newMD = append(newMD, kv)
}
}
req.Metadata = newMD
for k, values := range *md {
for _, v := range values {
req.Metadata = append(req.Metadata, &ttrpc.KeyValue{
Key: k,
Value: v,
})
}
}
}
// vsock scheme is self-defined to be kept from being parsed by grpc.
// Any format starting with "scheme://" will be parsed by grpc and we lose
// all address information because vsock scheme is not supported by grpc.

View File

@@ -1615,6 +1615,7 @@ func (q *qemu) hotplugDevice(ctx context.Context, devInfo interface{}, devType d
func (q *qemu) hotplugAddDevice(ctx context.Context, devInfo interface{}, devType deviceType) (interface{}, error) {
span, ctx := q.trace(ctx, "hotplugAddDevice")
defer span.End()
span.SetAttributes(otelLabel.Any("device", devInfo))
data, err := q.hotplugDevice(ctx, devInfo, devType, addDevice)
if err != nil {
@@ -1627,6 +1628,7 @@ func (q *qemu) hotplugAddDevice(ctx context.Context, devInfo interface{}, devTyp
func (q *qemu) hotplugRemoveDevice(ctx context.Context, devInfo interface{}, devType deviceType) (interface{}, error) {
span, ctx := q.trace(ctx, "hotplugRemoveDevice")
defer span.End()
span.SetAttributes(otelLabel.Any("device", devInfo))
data, err := q.hotplugDevice(ctx, devInfo, devType, removeDevice)
if err != nil {
@@ -1855,6 +1857,7 @@ func (q *qemu) addDevice(ctx context.Context, devInfo interface{}, devType devic
var err error
span, _ := q.trace(ctx, "addDevice")
defer span.End()
span.SetAttributes(otelLabel.Any("device", devInfo))
switch v := devInfo.(type) {
case types.Volume:

View File

@@ -191,7 +191,7 @@ func NewVMFromGrpc(ctx context.Context, v *pb.GrpcVM, config VMConfig) (*VM, err
// create agent instance
newAagentFunc := getNewAgentFunc(ctx)
agent := newAagentFunc()
agent.configureFromGrpc(hypervisor, v.Id, config.AgentConfig)
agent.configureFromGrpc(ctx, hypervisor, v.Id, config.AgentConfig)
return &VM{
id: v.Id,