mirror of
https://github.com/aljazceru/kata-containers.git
synced 2026-02-23 15:34:28 +01:00
shimv2: Add tracing
Add trace calls to shimv2 that create spans for functions in service.go. Tracing starts in New(), which is forked twice and is followed by either StartShim() or Create(). Tracing cannot start without the value for Trace enabled from the runtime config so load the config in New(), which results in it being loaded every time New() is called in addition to where it is originally loaded after Create(). Fixes #903 Signed-off-by: Chelsea Mafrica <chelsea.e.mafrica@intel.com>
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/containerd/typeurl"
|
||||
ptypes "github.com/gogo/protobuf/types"
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
@@ -78,6 +79,21 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
||||
vci.SetLogger(ctx, shimLog)
|
||||
katautils.SetLogger(ctx, shimLog, shimLog.Logger.Level)
|
||||
|
||||
// load runtime config so that tracing can start if enabled
|
||||
_, runtimeConfig, err := katautils.LoadConfiguration("", false, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create tracer
|
||||
_, err = katautils.CreateTracer("kata")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// create span
|
||||
span, ctx := trace(ctx, "New")
|
||||
defer span.Finish()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
s := &service{
|
||||
@@ -85,6 +101,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (cdshim.Shi
|
||||
pid: uint32(os.Getpid()),
|
||||
ctx: ctx,
|
||||
containers: make(map[string]*container),
|
||||
config: &runtimeConfig,
|
||||
events: make(chan interface{}, chSize),
|
||||
ec: make(chan exit, bufferSize),
|
||||
cancel: cancel,
|
||||
@@ -167,6 +184,13 @@ func newCommand(ctx context.Context, containerdBinary, id, containerdAddress str
|
||||
// StartShim willl start a kata shimv2 daemon which will implemented the
|
||||
// ShimV2 APIs such as create/start/update etc containers.
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
// Stop tracing here since a new tracer will be created the next time New()
|
||||
// is called again after StartShim()
|
||||
defer katautils.StopTracing(s.ctx)
|
||||
|
||||
span, _ := trace(s.ctx, "StartShim")
|
||||
defer span.Finish()
|
||||
|
||||
bundlePath, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", err
|
||||
@@ -278,7 +302,22 @@ func getTopic(e interface{}) string {
|
||||
return cdruntime.TaskUnknownTopic
|
||||
}
|
||||
|
||||
func trace(ctx context.Context, name string) (opentracing.Span, context.Context) {
|
||||
if ctx == nil {
|
||||
logrus.WithField("type", "bug").Error("trace called before context set")
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, name)
|
||||
span.SetTag("source", "runtime")
|
||||
|
||||
return span, ctx
|
||||
}
|
||||
|
||||
func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Cleanup")
|
||||
defer span.Finish()
|
||||
|
||||
//Since the binary cleanup will return the DeleteResponse from stdout to
|
||||
//containerd, thus we must make sure there is no any outputs in stdout except
|
||||
//the returned response, thus here redirect the log to stderr in case there's
|
||||
@@ -334,6 +373,9 @@ func (s *service) Cleanup(ctx context.Context) (_ *taskAPI.DeleteResponse, err e
|
||||
|
||||
// Create a new sandbox or container with the underlying OCI runtime
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *taskAPI.CreateTaskResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Create")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -387,6 +429,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAPI.StartResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Start")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -435,6 +480,9 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (_ *taskAP
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *taskAPI.DeleteResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Delete")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -484,6 +532,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (_ *task
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Exec")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
rpcDurationsHistogram.WithLabelValues("exec").Observe(float64(time.Since(start).Nanoseconds() / int64(time.Millisecond)))
|
||||
@@ -519,6 +570,9 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (_ *p
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "ResizePty")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -555,6 +609,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (_
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAPI.StateResponse, err error) {
|
||||
span, _ := trace(s.ctx, "State")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -600,11 +657,13 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (_ *taskAP
|
||||
Terminal: execs.tty.terminal,
|
||||
ExitStatus: uint32(execs.exitCode),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Pause")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -641,6 +700,9 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (_ *ptypes
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Resume")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -675,6 +737,9 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (_ *ptyp
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Kill")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -733,6 +798,9 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (_ *ptypes.E
|
||||
// Since for kata, it cannot get the process's pid from VM,
|
||||
// thus only return the Shim's pid directly.
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.PidsResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Pids")
|
||||
defer span.Finish()
|
||||
|
||||
var processes []*task.ProcessInfo
|
||||
|
||||
start := time.Now()
|
||||
@@ -753,6 +821,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (_ *taskAPI.
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "CloseIO")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -791,6 +862,9 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Checkpoint")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -802,6 +876,9 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
|
||||
|
||||
// Connect returns shim information such as the shim's pid
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *taskAPI.ConnectResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Connect")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -819,6 +896,8 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (_ *ta
|
||||
}
|
||||
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Shutdown")
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -832,6 +911,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
span.Finish()
|
||||
katautils.StopTracing(s.ctx)
|
||||
|
||||
s.cancel()
|
||||
|
||||
os.Exit(0)
|
||||
@@ -842,6 +924,9 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (_ *
|
||||
}
|
||||
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAPI.StatsResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Stats")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -868,6 +953,9 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (_ *taskAP
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *ptypes.Empty, err error) {
|
||||
span, _ := trace(s.ctx, "Update")
|
||||
defer span.Finish()
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
err = toGRPC(err)
|
||||
@@ -897,6 +985,9 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (_ *
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (_ *taskAPI.WaitResponse, err error) {
|
||||
span, _ := trace(s.ctx, "Wait")
|
||||
defer span.Finish()
|
||||
|
||||
var ret uint32
|
||||
|
||||
start := time.Now()
|
||||
|
||||
Reference in New Issue
Block a user