Files
kata-containers/virtcontainers/factory/cache/cache.go
Hui Zhu ace81155a4 factory: Make factory status can show status of VMCache server
This commit adds a new gRPC function Status to CacheService.  VMCache
server will reply the status of VMCache server.
Factory destroy will call gRPC Status to get the status of VMCache
server and output it when VMCache is enabled.

Fixes: #1395

Signed-off-by: Hui Zhu <teawater@hyper.sh>
2019-04-10 11:03:14 +08:00

128 lines
2.5 KiB
Go

// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// cache implements base vm factory on top of other base vm factory.
package cache
import (
"context"
"fmt"
"sync"
pb "github.com/kata-containers/runtime/protocols/cache"
vc "github.com/kata-containers/runtime/virtcontainers"
"github.com/kata-containers/runtime/virtcontainers/factory/base"
)
type cache struct {
base base.FactoryBase
cacheCh chan *vc.VM
closed chan<- int
wg sync.WaitGroup
closeOnce sync.Once
vmm map[*vc.VM]interface{}
vmmLock sync.RWMutex
}
// New creates a new cached vm factory.
func New(ctx context.Context, count uint, b base.FactoryBase) base.FactoryBase {
if count < 1 {
return b
}
cacheCh := make(chan *vc.VM)
closed := make(chan int, count)
c := cache{
base: b,
cacheCh: cacheCh,
closed: closed,
vmm: make(map[*vc.VM]interface{}),
}
for i := 0; i < int(count); i++ {
c.wg.Add(1)
go func() {
for {
vm, err := b.GetBaseVM(ctx, c.Config())
if err != nil {
c.wg.Done()
c.CloseFactory(ctx)
return
}
c.addToVmm(vm)
select {
case cacheCh <- vm:
// Because vm will not be relased or changed
// by cacheServer.GetBaseVM or removeFromVmm.
// So removeFromVmm can be called after vm send to cacheCh.
c.removeFromVmm(vm)
case <-closed:
c.removeFromVmm(vm)
vm.Stop()
c.wg.Done()
return
}
}
}()
}
return &c
}
func (c *cache) addToVmm(vm *vc.VM) {
c.vmmLock.Lock()
defer c.vmmLock.Unlock()
c.vmm[vm] = nil
}
func (c *cache) removeFromVmm(vm *vc.VM) {
c.vmmLock.Lock()
defer c.vmmLock.Unlock()
delete(c.vmm, vm)
}
// Config returns cache vm factory's base factory config.
func (c *cache) Config() vc.VMConfig {
return c.base.Config()
}
// GetVMStatus returns the status of the cached VMs.
func (c *cache) GetVMStatus() []*pb.GrpcVMStatus {
vs := []*pb.GrpcVMStatus{}
c.vmmLock.RLock()
defer c.vmmLock.RUnlock()
for vm := range c.vmm {
vs = append(vs, vm.GetVMStatus())
}
return vs
}
// GetBaseVM returns a base VM from cache factory's base factory.
func (c *cache) GetBaseVM(ctx context.Context, config vc.VMConfig) (*vc.VM, error) {
vm, ok := <-c.cacheCh
if ok {
return vm, nil
}
return nil, fmt.Errorf("cache factory is closed")
}
// CloseFactory closes the cache factory.
func (c *cache) CloseFactory(ctx context.Context) {
c.closeOnce.Do(func() {
for len(c.closed) < cap(c.closed) { // send sufficient closed signal
c.closed <- 0
}
c.wg.Wait()
close(c.cacheCh)
c.base.CloseFactory(ctx)
})
}