runtime: add new command to collect metrics from Kata containers

Add a new command to collect metrics and return metrics to Prometheus.

Signed-off-by: bin liu <bin@hyper.sh>
This commit is contained in:
bin liu
2020-06-10 17:40:47 +08:00
parent 186fed2a11
commit 1b75daa00f
280 changed files with 68037 additions and 96 deletions

View File

@@ -0,0 +1,107 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"context"
"github.com/sirupsen/logrus"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/oci"
"github.com/opencontainers/runtime-spec/specs-go"
)
func getContainer(containersClient containers.Store, namespace, cid string) (containers.Container, error) {
ctx := context.Background()
ctx = namespaces.WithNamespace(ctx, namespace)
return containersClient.Get(ctx, cid)
}
// isSandboxContainer return true if the container is a sandbox container.
func isSandboxContainer(c *containers.Container) bool {
// unmarshal from any to spec.
if c.Spec == nil {
monitorLog.WithField("container", c.ID).Error("container spec is nil")
return false
}
v, err := typeurl.UnmarshalAny(c.Spec)
if err != nil {
monitorLog.WithError(err).Error("failed to Unmarshal container spec")
return false
}
// convert to oci spec type
ociSpec := v.(*specs.Spec)
// get container type
containerType, err := oci.ContainerType(*ociSpec)
if err != nil {
monitorLog.WithError(err).Error("failed to get contaienr type")
return false
}
// return if is a sandbox container
return containerType == vc.PodSandbox
}
// getSandboxes get kata sandbox from containerd.
// this will be called only after monitor start.
func (ka *KataMonitor) getSandboxes() (map[string]string, error) {
client, err := containerd.New(ka.containerdAddr)
if err != nil {
return nil, err
}
defer client.Close()
ctx := context.Background()
// first all namespaces.
namespaceList, err := client.NamespaceService().List(ctx)
if err != nil {
return nil, err
}
// map of type: <key:sandbox_id => value: namespace>
sandboxMap := make(map[string]string)
for _, namespace := range namespaceList {
initSandboxByNamespaceFunc := func(namespace string) error {
ctx := context.Background()
namespacedCtx := namespaces.WithNamespace(ctx, namespace)
// only list Kata Containers pods/containers
containers, err := client.ContainerService().List(namespacedCtx,
"runtime.name=="+types.KataRuntimeName+`,labels."io.cri-containerd.kind"==sandbox`)
if err != nil {
return err
}
for i := range containers {
c := containers[i]
isc := isSandboxContainer(&c)
monitorLog.WithFields(logrus.Fields{"container": c.ID, "result": isc}).Debug("is this a sandbox container?")
if isc {
sandboxMap[c.ID] = namespace
}
}
return nil
}
if err := initSandboxByNamespaceFunc(namespace); err != nil {
return nil, err
}
}
return sandboxMap, nil
}

View File

@@ -0,0 +1,76 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"testing"
criContainerdAnnotations "github.com/containerd/cri-containerd/pkg/annotations"
"github.com/containerd/typeurl"
"github.com/containerd/containerd/containers"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/assert"
)
func TestIsSandboxContainer(t *testing.T) {
assert := assert.New(t)
c := &containers.Container{}
isc := isSandboxContainer(c)
assert.Equal(false, isc, "should not be a sandbox container")
spec := &specs.Spec{
Annotations: map[string]string{},
}
any, err := typeurl.MarshalAny(spec)
assert.Nil(err, "MarshalAny failed for spec")
c.Spec = any
// default container is a pod(sandbox) container
isc = isSandboxContainer(c)
assert.Equal(true, isc, "should be a sandbox container")
testCases := []struct {
annotationKey string
annotationValue string
result bool
}{
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: "",
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: criContainerdAnnotations.ContainerTypeContainer,
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: "pod",
result: false,
},
{
annotationKey: criContainerdAnnotations.ContainerType,
annotationValue: criContainerdAnnotations.ContainerTypeSandbox,
result: true,
},
}
for _, tc := range testCases {
spec.Annotations = map[string]string{
tc.annotationKey: tc.annotationValue,
}
any, err := typeurl.MarshalAny(spec)
assert.Nil(err, "MarshalAny failed for spec")
c.Spec = any
isc = isSandboxContainer(c)
assert.Equal(tc.result, isc, "assert failed for checking if is a sandbox container")
}
}

View File

@@ -0,0 +1,315 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
dto "github.com/prometheus/client_model/go"
)
const (
promNamespaceMonitor = "kata_monitor"
contentTypeHeader = "Content-Type"
contentEncodingHeader = "Content-Encoding"
)
var (
runningShimCount = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: promNamespaceMonitor,
Name: "running_shim_count",
Help: "Running shim count(running sandboxes).",
})
scrapeCount = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: promNamespaceMonitor,
Name: "scrape_count",
Help: "Scape count.",
})
scrapeFailedCount = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: promNamespaceMonitor,
Name: "scrape_failed_count",
Help: "Failed scape count.",
})
scrapeDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: promNamespaceMonitor,
Name: "scrape_durations_histogram_milliseconds",
Help: "Time used to scrape from shims",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
)
func registerMetrics() {
prometheus.MustRegister(runningShimCount)
prometheus.MustRegister(scrapeCount)
prometheus.MustRegister(scrapeFailedCount)
prometheus.MustRegister(scrapeDurationsHistogram)
}
// getMetricsAddress get metrics address for a sandbox, the abstract unix socket address is saved
// in `metrics_address` with the same place of `address`.
func (km *KataMonitor) getMetricsAddress(sandboxID, namespace string) (string, error) {
path := filepath.Join(km.containerdStatePath, types.ContainerdRuntimeTaskPath, namespace, sandboxID, "monitor_address")
data, err := ioutil.ReadFile(path)
if err != nil {
return "", err
}
return string(data), nil
}
// ProcessMetricsRequest get metrics from shim/hypervisor/vm/agent and return metrics to client.
func (km *KataMonitor) ProcessMetricsRequest(w http.ResponseWriter, r *http.Request) {
start := time.Now()
scrapeCount.Inc()
defer func() {
scrapeDurationsHistogram.Observe(float64(time.Since(start).Nanoseconds() / int64(time.Millisecond)))
}()
// prepare writer for writing response.
contentType := expfmt.Negotiate(r.Header)
// set response header
header := w.Header()
header.Set(contentTypeHeader, string(contentType))
// create writer
writer := io.Writer(w)
if mutils.GzipAccepted(r.Header) {
header.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
writer = gz
}
// create encoder to encode metrics.
encoder := expfmt.NewEncoder(writer, contentType)
// gather metrics collected for management agent.
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
monitorLog.WithError(err).Error("failed to Gather metrics from prometheus.DefaultGatherer")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
// encode metric gathered in current process
if err := encodeMetricFamily(mfs, encoder); err != nil {
monitorLog.WithError(err).Warnf("failed to encode metrics")
}
// aggregate sandboxes metrics and write to response by encoder
if err := km.aggregateSandboxMetrics(encoder); err != nil {
monitorLog.WithError(err).Errorf("failed aggregateSandboxMetrics")
scrapeFailedCount.Inc()
}
}
func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error {
for i := range mfs {
metricFamily := mfs[i]
if metricFamily.Name != nil && !strings.HasPrefix(*metricFamily.Name, promNamespaceMonitor) {
metricFamily.Name = mutils.String2Pointer(promNamespaceMonitor + "_" + *metricFamily.Name)
}
// encode and write to output
if err := encoder.Encode(metricFamily); err != nil {
return err
}
}
return nil
}
// aggregateSandboxMetrics will get metrics from one sandbox and do some process
func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
// get all sandboxes from cache
sandboxes := km.sandboxCache.getAllSandboxes()
// save running kata pods as a metrics.
runningShimCount.Set(float64(len(sandboxes)))
if len(sandboxes) == 0 {
return nil
}
// sandboxMetricsList contains list of MetricFamily list from one sandbox.
sandboxMetricsList := make([][]*dto.MetricFamily, 0)
wg := &sync.WaitGroup{}
// used to receive response
results := make(chan []*dto.MetricFamily, len(sandboxes))
monitorLog.WithField("sandbox_count", len(sandboxes)).Debugf("sandboxes count")
// get metrics from sandbox's shim
for sandboxID, namespace := range sandboxes {
wg.Add(1)
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := km.getSandboxMetrics(sandboxID, namespace)
if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
}
results <- sandboxMetrics
wg.Done()
monitorLog.WithField("sandbox_id", sandboxID).Debug("job finished")
}(sandboxID, namespace, results)
monitorLog.WithField("sandbox_id", sandboxID).Debug("job started")
}
wg.Wait()
monitorLog.Debug("all job finished")
close(results)
// get all job result from chan
for sandboxMetrics := range results {
if sandboxMetrics != nil {
sandboxMetricsList = append(sandboxMetricsList, sandboxMetrics)
}
}
if len(sandboxMetricsList) == 0 {
return nil
}
// metricsMap used to aggregate metrics from multiple sandboxes
// key is MetricFamily.Name, and value is list of MetricFamily from multiple sandboxes
metricsMap := make(map[string]*dto.MetricFamily)
// merge MetricFamily list for the same MetricFamily.Name from multiple sandboxes.
for i := range sandboxMetricsList {
sandboxMetrics := sandboxMetricsList[i]
for j := range sandboxMetrics {
mf := sandboxMetrics[j]
key := *mf.Name
// add MetricFamily.Metric to the exists MetricFamily instance
if oldmf, found := metricsMap[key]; found {
oldmf.Metric = append(oldmf.Metric, mf.Metric...)
} else {
metricsMap[key] = mf
}
}
}
// write metrics to response.
for _, mf := range metricsMap {
if err := encoder.Encode(mf); err != nil {
return err
}
}
return nil
}
// getSandboxMetrics will get sandbox's metrics from shim
func (km *KataMonitor) getSandboxMetrics(sandboxID, namespace string) ([]*dto.MetricFamily, error) {
socket, err := km.getMetricsAddress(sandboxID, namespace)
if err != nil {
return nil, err
}
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("unix", "\x00"+socket)
},
}
client := http.Client{
Timeout: 3 * time.Second,
Transport: transport,
}
resp, err := client.Get("http://shim/metrics")
if err != nil {
return nil, err
}
defer func() {
resp.Body.Close()
}()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return parsePrometheusMetrics(sandboxID, body)
}
// parsePrometheusMetrics will decode metrics from Prometheus text format
// and return array of *dto.MetricFamily with an ASC order
func parsePrometheusMetrics(sandboxID string, body []byte) ([]*dto.MetricFamily, error) {
reader := bytes.NewReader(body)
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)
// decode metrics from sandbox to MetricFamily
list := make([]*dto.MetricFamily, 0)
for {
mf := &dto.MetricFamily{}
if err := decoder.Decode(mf); err != nil {
if err == io.EOF {
break
}
return nil, err
}
metricList := mf.Metric
for j := range metricList {
metric := metricList[j]
metric.Label = append(metric.Label, &dto.LabelPair{
Name: mutils.String2Pointer("sandbox_id"),
Value: mutils.String2Pointer(sandboxID),
})
}
// Kata shim are using prometheus go client, add an prefix for metric name to avoid confusing
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
mf.Name = mutils.String2Pointer("kata_shim_" + *mf.Name)
}
list = append(list, mf)
}
// sort ASC
sort.SliceStable(list, func(i, j int) bool {
b := strings.Compare(*list[i].Name, *list[j].Name)
return b < 0
})
return list, nil
}

View File

@@ -0,0 +1,139 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"bytes"
"strings"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
)
var (
shimMetricBody = `# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 23
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 37
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 6.8986e-05
go_gc_duration_seconds{quantile="0.25"} 0.000148349
go_gc_duration_seconds{quantile="0.5"} 0.000184765
go_gc_duration_seconds{quantile="0.75"} 0.000209099
go_gc_duration_seconds{quantile="1"} 0.000507322
go_gc_duration_seconds_sum 1.353545751
go_gc_duration_seconds_count 6491
# HELP ttt Help for ttt.
# TYPE ttt gauge
ttt 999
`
)
func TestParsePrometheusMetrics(t *testing.T) {
assert := assert.New(t)
sandboxID := "sandboxID-abc"
// parse metrics
list, err := parsePrometheusMetrics(sandboxID, []byte(shimMetricBody))
assert.Nil(err, "parsePrometheusMetrics should not return error")
assert.Equal(4, len(list), "should return 3 metric families")
// assert the first metric
mf := list[0]
assert.Equal("kata_shim_go_gc_duration_seconds", *mf.Name, "family name should be kata_shim_go_gc_duration_seconds")
assert.Equal(1, len(mf.Metric), "metric count should be 1")
assert.Equal("A summary of the GC invocation durations.", *mf.Help, "help should be `go_gc_duration_seconds A summary of the GC invocation durations.`")
assert.Equal("SUMMARY", mf.Type.String(), "metric type should be summary")
// get the metric
m := mf.Metric[0]
assert.Equal(1, len(m.Label), "should have only 1 labels")
assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id")
assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID)
summary := m.Summary
assert.NotNil(summary, "summary should not be nil")
assert.NotNil(6491, *summary.SampleCount, "summary count should be 6491")
assert.NotNil(1.353545751, *summary.SampleSum, "summary count should be 1.353545751")
quantiles := summary.Quantile
assert.Equal(5, len(quantiles), "should have 5 quantiles")
// the second
assert.Equal(0.25, *quantiles[1].Quantile, "Quantile should be 0.25")
assert.Equal(0.000148349, *quantiles[1].Value, "Value should be 0.000148349")
// the last
assert.Equal(1.0, *quantiles[4].Quantile, "Quantile should be 1")
assert.Equal(0.000507322, *quantiles[4].Value, "Value should be 0.000507322")
// assert the second metric
mf = list[1]
assert.Equal("kata_shim_go_threads", *mf.Name, "family name should be kata_shim_go_threads")
assert.Equal("GAUGE", mf.Type.String(), "metric type should be gauge")
assert.Equal("sandbox_id", *m.Label[0].Name, "label name should be sandbox_id")
assert.Equal(sandboxID, *m.Label[0].Value, "label value should be", sandboxID)
// assert the third metric
mf = list[2]
assert.Equal("kata_shim_process_open_fds", *mf.Name, "family name should be kata_shim_process_open_fds")
assert.Equal("GAUGE", mf.Type.String(), "metric type should be gauge")
assert.Equal("sandbox_id", *mf.Metric[0].Label[0].Name, "label name should be sandbox_id")
assert.Equal(sandboxID, *mf.Metric[0].Label[0].Value, "label value should be", sandboxID)
// assert the last metric
mf = list[3]
assert.Equal("ttt", *mf.Name, "family name should be ttt")
assert.Equal("GAUGE", mf.Type.String(), "metric type should be gauge")
assert.Equal("sandbox_id", *mf.Metric[0].Label[0].Name, "label name should be sandbox_id")
assert.Equal(sandboxID, *mf.Metric[0].Label[0].Value, "label value should be", sandboxID)
}
func TestEncodeMetricFamily(t *testing.T) {
assert := assert.New(t)
prometheus.MustRegister(runningShimCount)
prometheus.MustRegister(scrapeCount)
runningShimCount.Add(11)
scrapeCount.Inc()
scrapeCount.Inc()
mfs, err := prometheus.DefaultGatherer.Gather()
// create encoder
buf := bytes.NewBufferString("")
encoder := expfmt.NewEncoder(buf, expfmt.FmtText)
// encode metrics to text format
err = encodeMetricFamily(mfs, encoder)
assert.Nil(err, "encodeMetricFamily should not return error")
// here will be to many metrics,
// we only check two metrics that we have set
lines := strings.Split(buf.String(), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "#") {
continue
}
fields := strings.Split(line, " ")
if len(fields) != 2 {
continue
}
// only check kata_monitor_running_shim_count and kata_monitor_scrape_count
if fields[0] == "kata_monitor_running_shim_count" {
assert.Equal("11", fields[1], "kata_monitor_running_shim_count should be 11")
} else if fields[0] == "kata_monitor_scrape_count" {
assert.Equal("2", fields[1], "kata_monitor_scrape_count should be 2")
}
}
}

View File

@@ -0,0 +1,80 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"fmt"
"os"
"sync"
"github.com/containerd/containerd/defaults"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/sirupsen/logrus"
// register grpc event types
_ "github.com/containerd/containerd/api/events"
)
var monitorLog = logrus.WithField("source", "kata-monitor")
// SetLogger sets the logger for katamonitor package.
func SetLogger(logger *logrus.Entry) {
fields := monitorLog.Data
monitorLog = logger.WithFields(fields)
}
// KataMonitor is monitor agent
type KataMonitor struct {
containerdAddr string
containerdConfigFile string
containerdStatePath string
sandboxCache *sandboxCache
}
// NewKataMonitor create and return a new KataMonitor instance
func NewKataMonitor(containerdAddr, containerdConfigFile string) (*KataMonitor, error) {
if containerdAddr == "" {
return nil, fmt.Errorf("Containerd serve address missing.")
}
containerdConf := &srvconfig.Config{
State: defaults.DefaultStateDir,
}
if err := srvconfig.LoadConfig(containerdConfigFile, containerdConf); err != nil && !os.IsNotExist(err) {
return nil, err
}
ka := &KataMonitor{
containerdAddr: containerdAddr,
containerdConfigFile: containerdConfigFile,
containerdStatePath: containerdConf.State,
sandboxCache: &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]string),
},
}
if err := ka.initSandboxCache(); err != nil {
return nil, err
}
// register metrics
registerMetrics()
go ka.sandboxCache.startEventsListener(ka.containerdAddr)
return ka, nil
}
func (ka *KataMonitor) initSandboxCache() error {
sandboxes, err := ka.getSandboxes()
if err != nil {
return err
}
ka.sandboxCache.init(sandboxes)
return nil
}

View File

@@ -0,0 +1,170 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"context"
"sync"
"github.com/containerd/containerd"
"github.com/sirupsen/logrus"
"encoding/json"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
"github.com/kata-containers/kata-containers/src/runtime/pkg/types"
// Register grpc event types
_ "github.com/containerd/containerd/api/events"
)
type sandboxCache struct {
*sync.Mutex
sandboxes map[string]string
}
func (sc *sandboxCache) getAllSandboxes() map[string]string {
sc.Lock()
defer sc.Unlock()
return sc.sandboxes
}
func (sc *sandboxCache) deleteIfExists(id string) (string, bool) {
sc.Lock()
defer sc.Unlock()
if val, found := sc.sandboxes[id]; found {
delete(sc.sandboxes, id)
return val, true
}
// not in sandbox cache
return "", false
}
func (sc *sandboxCache) putIfNotExists(id, value string) bool {
sc.Lock()
defer sc.Unlock()
if _, found := sc.sandboxes[id]; !found {
sc.sandboxes[id] = value
return true
}
// already in sandbox cache
return false
}
func (sc *sandboxCache) init(sandboxes map[string]string) {
sc.Lock()
defer sc.Unlock()
sc.sandboxes = sandboxes
}
// startEventsListener will boot a thread to listen container events to manage sandbox cache
func (sc *sandboxCache) startEventsListener(addr string) error {
client, err := containerd.New(addr)
if err != nil {
return err
}
defer client.Close()
ctx := context.Background()
eventsClient := client.EventService()
containerClient := client.ContainerService()
// only need create/delete events.
eventFilters := []string{
`topic=="/containers/create"`,
`topic=="/containers/delete"`,
}
eventsCh, errCh := eventsClient.Subscribe(ctx, eventFilters...)
for {
var e *events.Envelope
select {
case e = <-eventsCh:
case err = <-errCh:
monitorLog.WithError(err).Warn("get error from error chan")
return err
}
if e != nil {
var eventBody []byte
if e.Event != nil {
v, err := typeurl.UnmarshalAny(e.Event)
if err != nil {
monitorLog.WithError(err).Warn("cannot unmarshal an event from Any")
continue
}
eventBody, err = json.Marshal(v)
if err != nil {
monitorLog.WithError(err).Warn("cannot marshal Any into JSON")
continue
}
}
if e.Topic == "/containers/create" {
// Namespace: k8s.io
// Topic: /containers/create
// Event: {
// "id":"6a2e22e6fffaf1dec63ddabf587ed56069b1809ba67a0d7872fc470528364e66",
// "image":"k8s.gcr.io/pause:3.1",
// "runtime":{"name":"io.containerd.kata.v2"}
// }
cc := eventstypes.ContainerCreate{}
err := json.Unmarshal(eventBody, &cc)
if err != nil {
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerCreate failed")
continue
}
// skip non-kata contaienrs
if cc.Runtime.Name != types.KataRuntimeName {
continue
}
c, err := getContainer(containerClient, e.Namespace, cc.ID)
if err != nil {
monitorLog.WithError(err).WithField("container", cc.ID).Warn("failed to get container")
continue
}
// if the container is a sandbox container,
// means the VM is started, and can start to collect metrics from the VM.
if isSandboxContainer(&c) {
// we can simply put the contaienrid in sandboxes list if the conatiner is a sandbox container
sc.putIfNotExists(cc.ID, e.Namespace)
monitorLog.WithField("container", cc.ID).Info("add sandbox to cache")
}
} else if e.Topic == "/containers/delete" {
// Namespace: k8s.io
// Topic: /containers/delete
// Event: {
// "id":"73ec10d2e38070f930310687ab46bbaa532c79d5680fd7f18fff99f759d9385e"
// }
cd := &eventstypes.ContainerDelete{}
err := json.Unmarshal(eventBody, &cd)
if err != nil {
monitorLog.WithError(err).WithField("body", string(eventBody)).Warn("unmarshal ContainerDelete failed")
}
// if container in sandboxes list, it must be the pause container in the sandbox,
// so the contaienr id is the sandbox id
// we can simply delete the contaienr from sandboxes list
// the last container in a sandbox is deleted, means the VM will stop.
_, deleted := sc.deleteIfExists(cd.ID)
monitorLog.WithFields(logrus.Fields{"container": cd.ID, "result": deleted}).Info("delete sandbox from cache")
} else {
monitorLog.WithFields(logrus.Fields{"Namespace": e.Namespace, "Topic": e.Topic, "Event": string(eventBody)}).Error("other events")
}
}
}
}

View File

@@ -0,0 +1,49 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package katamonitor
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
func TestSandboxCache(t *testing.T) {
assert := assert.New(t)
sc := &sandboxCache{
Mutex: &sync.Mutex{},
sandboxes: make(map[string]string),
}
scMap := map[string]string{"111": "222"}
sc.init(scMap)
scMap = sc.getAllSandboxes()
assert.Equal(1, len(scMap))
// put new item
id := "new-id"
value := "new-value"
b := sc.putIfNotExists(id, "new-value")
assert.Equal(true, b)
assert.Equal(2, len(scMap))
// put key that alreay exists
b = sc.putIfNotExists(id, "new-value")
assert.Equal(false, b)
v, b := sc.deleteIfExists(id)
assert.Equal(value, v)
assert.Equal(true, b)
assert.Equal(1, len(scMap))
v, b = sc.deleteIfExists(id)
assert.Equal("", v)
assert.Equal(false, b)
assert.Equal(1, len(scMap))
}

View File

@@ -0,0 +1,11 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package types
const (
KataRuntimeName = "io.containerd.kata.v2"
ContainerdRuntimeTaskPath = "io.containerd.runtime.v2.task"
)

View File

@@ -1,3 +1,8 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (

View File

@@ -1,10 +1,12 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
)
@@ -29,14 +31,3 @@ func GzipAccepted(header http.Header) bool {
func String2Pointer(s string) *string {
return &s
}
// EnsureFileDir will check if file a in an absolute format and ensure the directory is exits
// if not, make the dir like `mkdir -p`
func EnsureFileDir(file string) error {
if !filepath.IsAbs(file) {
return fmt.Errorf("file must be an absolute path")
}
path := filepath.Dir(file)
return os.MkdirAll(path, 0755)
}

View File

@@ -1,8 +1,12 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (
"net/http"
"os"
"testing"
"github.com/stretchr/testify/assert"
@@ -41,45 +45,3 @@ func TestGzipAccepted(t *testing.T) {
assert.Equal(tc.result, b)
}
}
func TestEnsureFileDir(t *testing.T) {
assert := assert.New(t)
testCases := []struct {
file string
path string
err bool
}{
{
file: "abc.txt",
path: "",
err: true,
},
{
file: "/tmp/kata-test/abc/def/igh.txt",
path: "/tmp/kata-test/abc/def",
err: false,
},
{
file: "/tmp/kata-test/abc/../def/igh.txt",
path: "/tmp/kata-test/def",
err: false,
},
}
for i := range testCases {
tc := testCases[i]
err := EnsureFileDir(tc.file)
// assert error
assert.Equal(tc.err, err != nil)
if !tc.err {
// assert directory created
fileInfo, err := os.Stat(tc.path)
assert.Equal(nil, err)
assert.Equal(true, fileInfo.IsDir())
}
}
// clear test directory
os.RemoveAll("/tmp/kata-test")
}