diff --git a/src/runtime/pkg/kata-monitor/cri.go b/src/runtime/pkg/kata-monitor/cri.go index b0cd82a01..0e9e06884 100644 --- a/src/runtime/pkg/kata-monitor/cri.go +++ b/src/runtime/pkg/kata-monitor/cri.go @@ -113,15 +113,15 @@ func parseEndpoint(endpoint string) (string, string, error) { } } -// getSandboxes gets ready sandboxes from the container engine and returns an updated sandboxMap -func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool, error) { - newMap := make(map[string]bool) +// syncSandboxes gets pods metadata from the container manager and updates the sandbox cache. +func (km *KataMonitor) syncSandboxes(sandboxList []string) ([]string, error) { runtimeClient, runtimeConn, err := getRuntimeClient(km.runtimeEndpoint) if err != nil { - return newMap, err + return sandboxList, err } defer closeConnection(runtimeConn) + // TODO: if len(sandboxList) is 1, better we just runtimeClient.PodSandboxStatus(...) targeting the single sandbox filter := &pb.PodSandboxFilter{ State: &pb.PodSandboxStateValue{ State: pb.PodSandboxState_SANDBOX_READY, @@ -134,32 +134,32 @@ func (km *KataMonitor) getSandboxes(sandboxMap map[string]bool) (map[string]bool monitorLog.Tracef("ListPodSandboxRequest: %v", request) r, err := runtimeClient.ListPodSandbox(context.Background(), request) if err != nil { - return newMap, err + return sandboxList, err } monitorLog.Tracef("ListPodSandboxResponse: %v", r) for _, pod := range r.Items { - // Use the cached data if available - if isKata, ok := sandboxMap[pod.Id]; ok { - newMap[pod.Id] = isKata - if isKata { - monitorLog.Debugf("KATA POD %s (cached)", pod.Id) + for _, sandbox := range sandboxList { + if pod.Id == sandbox { + km.sandboxCache.setMetadata(sandbox, sandboxKubeData{ + uid: pod.Metadata.Uid, + name: pod.Metadata.Name, + namespace: pod.Metadata.Namespace, + }) + + sandboxList = removeFromSandboxList(sandboxList, sandbox) + + monitorLog.WithFields(logrus.Fields{ + "Pod Name": pod.Metadata.Name, + "Pod Namespace": pod.Metadata.Namespace, + "Pod UID": pod.Metadata.Uid, + }).Debugf("Synced KATA POD %s", pod.Id) + + break } - continue } - - // Check if a directory associated with the POD ID exist on the kata fs: - // if so we know that the POD is a kata one. - newMap[pod.Id] = checkSandboxFSExists(pod.Id) - if newMap[pod.Id] { - monitorLog.Debugf("KATA POD %s (new)", pod.Id) - } - monitorLog.WithFields(logrus.Fields{ - "id": pod.Id, - "is kata": newMap[pod.Id], - "pod": pod, - }).Trace("") } - - return newMap, nil + // TODO: here we should mark the sandboxes we failed to retrieve info from: we should try a finite number of times + // to retrieve their metadata: if we fail resign and remove them from the sanbox cache (with a Warning log). + return sandboxList, nil } diff --git a/src/runtime/pkg/kata-monitor/metrics.go b/src/runtime/pkg/kata-monitor/metrics.go index 5f5fa3e4c..d2958441b 100644 --- a/src/runtime/pkg/kata-monitor/metrics.go +++ b/src/runtime/pkg/kata-monitor/metrics.go @@ -141,7 +141,7 @@ func encodeMetricFamily(mfs []*dto.MetricFamily, encoder expfmt.Encoder) error { // aggregateSandboxMetrics will get metrics from one sandbox and do some process func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error { // get all kata sandboxes from cache - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() // save running kata pods as a metrics. runningShimCount.Set(float64(len(sandboxes))) diff --git a/src/runtime/pkg/kata-monitor/monitor.go b/src/runtime/pkg/kata-monitor/monitor.go index 593a15c88..caacfb222 100644 --- a/src/runtime/pkg/kata-monitor/monitor.go +++ b/src/runtime/pkg/kata-monitor/monitor.go @@ -53,7 +53,7 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { runtimeEndpoint: runtimeEndpoint, sandboxCache: &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), }, } @@ -65,6 +65,15 @@ func NewKataMonitor(runtimeEndpoint string) (*KataMonitor, error) { return km, nil } +func removeFromSandboxList(sandboxList []string, sandboxToRemove string) []string { + for i, sandbox := range sandboxList { + if sandbox == sandboxToRemove { + return append(sandboxList[:i], sandboxList[i+1:]...) + } + } + return sandboxList +} + // startPodCacheUpdater will boot a thread to manage sandbox cache func (km *KataMonitor) startPodCacheUpdater() { sbsWatcher, err := fsnotify.NewWatcher() @@ -84,9 +93,24 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.Debugf("started fs monitoring @%s", getSandboxFS()) break } - // we refresh the pod cache once if we get multiple add/delete pod events in a short time (< podCacheRefreshDelaySeconds) + // Initial sync with the kata sandboxes already running + sbsFile, err := os.Open(getSandboxFS()) + if err != nil { + monitorLog.WithError(err).Fatal("cannot open sandboxes fs") + os.Exit(1) + } + sandboxList, err := sbsFile.Readdirnames(0) + if err != nil { + monitorLog.WithError(err).Fatal("cannot read sandboxes fs") + os.Exit(1) + } + monitorLog.Debug("initial sync of sbs directory completed") + monitorLog.Tracef("pod list from sbs: %v", sandboxList) + + // We should get kubernetes metadata from the container manager for each new kata sandbox we detect. + // It may take a while for data to be available, so we always wait podCacheRefreshDelaySeconds before checking. cacheUpdateTimer := time.NewTimer(podCacheRefreshDelaySeconds * time.Second) - cacheUpdateTimerWasSet := false + cacheUpdateTimerIsSet := true for { select { case event, ok := <-sbsWatcher.Events: @@ -99,11 +123,18 @@ func (km *KataMonitor) startPodCacheUpdater() { case fsnotify.Create: splitPath := strings.Split(event.Name, string(os.PathSeparator)) id := splitPath[len(splitPath)-1] - if !km.sandboxCache.putIfNotExists(id, true) { + if !km.sandboxCache.putIfNotExists(id, sandboxKubeData{}) { monitorLog.WithField("pod", id).Warn( "CREATE event but pod already present in the sandbox cache") } + sandboxList = append(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: added pod") + if !cacheUpdateTimerIsSet { + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + monitorLog.Debugf( + "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) + } case fsnotify.Remove: splitPath := strings.Split(event.Name, string(os.PathSeparator)) @@ -112,26 +143,27 @@ func (km *KataMonitor) startPodCacheUpdater() { monitorLog.WithField("pod", id).Warn( "REMOVE event but pod was missing from the sandbox cache") } + sandboxList = removeFromSandboxList(sandboxList, id) monitorLog.WithField("pod", id).Info("sandbox cache: removed pod") } - // While we process fs events directly to update the sandbox cache we need to sync with the - // container engine to ensure we are on sync with it: we can get out of sync in environments - // where kata workloads can be started by other processes than the container engine. - cacheUpdateTimerWasSet = cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) - monitorLog.WithField("was reset", cacheUpdateTimerWasSet).Debugf( - "cache update timer fires in %d secs", podCacheRefreshDelaySeconds) - case <-cacheUpdateTimer.C: - monitorLog.Debugf("sync sandbox cache with the container engine") - sandboxes, err := km.getSandboxes(km.sandboxCache.getAllSandboxes()) + cacheUpdateTimerIsSet = false + monitorLog.WithField("pod list", sandboxList).Debugf( + "retrieve pods metadata from the container manager") + sandboxList, err = km.syncSandboxes(sandboxList) if err != nil { - monitorLog.WithError(err).Error("failed to get sandboxes") + monitorLog.WithError(err).Error("failed to get sandboxes metadata") continue } - monitorLog.WithField("count", len(sandboxes)).Info("synced sandbox cache with the container engine") - monitorLog.WithField("sandboxes", sandboxes).Trace("dump sandbox cache") - km.sandboxCache.set(sandboxes) + if len(sandboxList) > 0 { + monitorLog.WithField("sandboxes", sandboxList).Debugf( + "%d sandboxes still miss metadata", len(sandboxList)) + cacheUpdateTimer.Reset(podCacheRefreshDelaySeconds * time.Second) + cacheUpdateTimerIsSet = true + } + + monitorLog.WithField("sandboxes", km.sandboxCache.getSandboxList()).Trace("dump sandbox cache") } } } @@ -155,7 +187,7 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) { // ListSandboxes list all sandboxes running in Kata func (km *KataMonitor) ListSandboxes(w http.ResponseWriter, r *http.Request) { - sandboxes := km.sandboxCache.getKataSandboxes() + sandboxes := km.sandboxCache.getSandboxList() for _, s := range sandboxes { w.Write([]byte(fmt.Sprintf("%s\n", s))) } diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache.go b/src/runtime/pkg/kata-monitor/sandbox_cache.go index 98978f193..f4f542b41 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache.go @@ -9,28 +9,31 @@ import ( "sync" ) +type sandboxKubeData struct { + uid string + name string + namespace string +} type sandboxCache struct { *sync.Mutex - // the bool value tracks if the pod is a kata one (true) or not (false) - sandboxes map[string]bool + // the sandboxKubeData links the sandbox id from the container manager to the pod metadata of kubernetes + sandboxes map[string]sandboxKubeData } -func (sc *sandboxCache) getAllSandboxes() map[string]bool { +func (sc *sandboxCache) getSandboxes() map[string]sandboxKubeData { sc.Lock() defer sc.Unlock() return sc.sandboxes } -func (sc *sandboxCache) getKataSandboxes() []string { +func (sc *sandboxCache) getSandboxList() []string { sc.Lock() defer sc.Unlock() - var katasandboxes []string - for id, isKata := range sc.sandboxes { - if isKata { - katasandboxes = append(katasandboxes, id) - } + var sandboxList []string + for id := range sc.sandboxes { + sandboxList = append(sandboxList, id) } - return katasandboxes + return sandboxList } func (sc *sandboxCache) deleteIfExists(id string) bool { @@ -46,7 +49,7 @@ func (sc *sandboxCache) deleteIfExists(id string) bool { return false } -func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { +func (sc *sandboxCache) putIfNotExists(id string, value sandboxKubeData) bool { sc.Lock() defer sc.Unlock() @@ -59,7 +62,14 @@ func (sc *sandboxCache) putIfNotExists(id string, value bool) bool { return false } -func (sc *sandboxCache) set(sandboxes map[string]bool) { +func (sc *sandboxCache) setMetadata(id string, value sandboxKubeData) { + sc.Lock() + defer sc.Unlock() + + sc.sandboxes[id] = value +} + +func (sc *sandboxCache) set(sandboxes map[string]sandboxKubeData) { sc.Lock() defer sc.Unlock() sc.sandboxes = sandboxes diff --git a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go index 43b8c8c99..6dd2e7894 100644 --- a/src/runtime/pkg/kata-monitor/sandbox_cache_test.go +++ b/src/runtime/pkg/kata-monitor/sandbox_cache_test.go @@ -16,24 +16,24 @@ func TestSandboxCache(t *testing.T) { assert := assert.New(t) sc := &sandboxCache{ Mutex: &sync.Mutex{}, - sandboxes: make(map[string]bool), + sandboxes: make(map[string]sandboxKubeData), } - scMap := map[string]bool{"111": true} + scMap := map[string]sandboxKubeData{"111": {"1-2-3", "test-name", "test-namespace"}} sc.set(scMap) - scMap = sc.getAllSandboxes() + scMap = sc.getSandboxes() assert.Equal(1, len(scMap)) // put new item id := "new-id" - b := sc.putIfNotExists(id, true) + b := sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(true, b) assert.Equal(2, len(scMap)) // put key that alreay exists - b = sc.putIfNotExists(id, true) + b = sc.putIfNotExists(id, sandboxKubeData{}) assert.Equal(false, b) b = sc.deleteIfExists(id)