Files
kata-containers/netmon/netmon.go
Sebastien Boeuf f1315908c7 netmon: Build netmon from the master Makefile
This commit modifies the Makefile at the root of this repository
so that the binary kata-netmon can be built from there.

Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
2018-09-14 09:15:53 -07:00

660 lines
17 KiB
Go

// Copyright (c) 2018 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log/syslog"
"net"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/sirupsen/logrus"
lSyslog "github.com/sirupsen/logrus/hooks/syslog"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
// The following types and structures have to be kept in sync with the
// description of the agent protocol. Those definitions need to be in their
// own separate package so that they can be imported directly from this code.
// The reason for not importing them now, is because importing the whole agent
// protocol adds up too much overhead because of the grpc protocol involved.
// IPFamily define the IP address family type.
type IPFamily int32
// IPAddress describes the IP address format expected by Kata API.
type IPAddress struct {
Family IPFamily `json:"family,omitempty"`
Address string `json:"address,omitempty"`
Mask string `json:"mask,omitempty"`
}
// Interface describes the network interface format expected by Kata API.
type Interface struct {
Device string `json:"device,omitempty"`
Name string `json:"name,omitempty"`
IPAddresses []*IPAddress `json:"IPAddresses,omitempty"`
Mtu uint64 `json:"mtu,omitempty"`
HwAddr string `json:"hwAddr,omitempty"`
PciAddr string `json:"pciAddr,omitempty"`
}
// Route describes the network route format expected by Kata API.
type Route struct {
Dest string `json:"dest,omitempty"`
Gateway string `json:"gateway,omitempty"`
Device string `json:"device,omitempty"`
Source string `json:"source,omitempty"`
Scope uint32 `json:"scope,omitempty"`
}
const (
netmonName = "kata-netmon"
kataCmd = "kata-network"
kataCLIAddIfaceCmd = "add-iface"
kataCLIDelIfaceCmd = "del-iface"
kataCLIUpdtRoutesCmd = "update-routes"
kataSuffix = "kata"
// For simplicity the code will only focus on IPv4 addresses for now.
netlinkFamily = netlink.FAMILY_V4
storageParentPath = "/var/run/kata-containers/netmon/sbs"
storageDirPerm = os.FileMode(0750)
// sharedFile is the name of the file that will be used to share
// the data between this process and the kata-runtime process
// responsible for updating the network.
sharedFile = "shared.json"
storageFilePerm = os.FileMode(0640)
)
// version is the netmon version. This variable is populated at build time.
var version = "unknown"
type netmonParams struct {
sandboxID string
runtimePath string
debug bool
logLevel string
}
type netmon struct {
netmonParams
storagePath string
sharedFile string
netIfaces map[int]Interface
linkUpdateCh chan netlink.LinkUpdate
linkDoneCh chan struct{}
rtUpdateCh chan netlink.RouteUpdate
rtDoneCh chan struct{}
netHandler *netlink.Handle
}
var netmonLog = logrus.New()
func printVersion() {
fmt.Printf("%s version %s\n", netmonName, version)
}
const componentDescription = `is a network monitoring process that is intended to be started in the
appropriate network namespace so that it can listen to any event related to
link and routes. Whenever a new interface or route is created/updated, it is
responsible for calling into the kata-runtime CLI to ask for the actual
creation/update of the given interface or route.
`
func printComponentDescription() {
fmt.Printf("\n%s %s\n", netmonName, componentDescription)
}
func parseOptions() netmonParams {
var version, help bool
params := netmonParams{}
flag.BoolVar(&help, "h", false, "describe component usage")
flag.BoolVar(&help, "help", false, "")
flag.BoolVar(&params.debug, "d", false, "enable debug mode")
flag.BoolVar(&version, "v", false, "display program version and exit")
flag.BoolVar(&version, "version", false, "")
flag.StringVar(&params.sandboxID, "s", "", "sandbox id (required)")
flag.StringVar(&params.runtimePath, "r", "", "runtime path (required)")
flag.StringVar(&params.logLevel, "log", "warn",
"log messages above specified level: debug, warn, error, fatal or panic")
flag.Parse()
if help {
printComponentDescription()
flag.PrintDefaults()
os.Exit(0)
}
if version {
printVersion()
os.Exit(0)
}
if params.sandboxID == "" {
fmt.Fprintf(os.Stderr, "Error: sandbox id is empty, one must be provided\n")
flag.PrintDefaults()
os.Exit(1)
}
if params.runtimePath == "" {
fmt.Fprintf(os.Stderr, "Error: runtime path is empty, one must be provided\n")
flag.PrintDefaults()
os.Exit(1)
}
return params
}
func newNetmon(params netmonParams) (*netmon, error) {
handler, err := netlink.NewHandle(netlinkFamily)
if err != nil {
return nil, err
}
n := &netmon{
netmonParams: params,
storagePath: filepath.Join(storageParentPath, params.sandboxID),
sharedFile: filepath.Join(storageParentPath, params.sandboxID, sharedFile),
netIfaces: make(map[int]Interface),
linkUpdateCh: make(chan netlink.LinkUpdate),
linkDoneCh: make(chan struct{}),
rtUpdateCh: make(chan netlink.RouteUpdate),
rtDoneCh: make(chan struct{}),
netHandler: handler,
}
if err := os.MkdirAll(n.storagePath, storageDirPerm); err != nil {
return nil, err
}
return n, nil
}
func (n *netmon) cleanup() {
os.RemoveAll(n.storagePath)
n.netHandler.Delete()
close(n.linkDoneCh)
close(n.rtDoneCh)
}
func (n *netmon) logger() *logrus.Entry {
fields := logrus.Fields{
"name": netmonName,
"pid": os.Getpid(),
"source": "netmon",
}
if n.sandboxID != "" {
fields["sandbox"] = n.sandboxID
}
return netmonLog.WithFields(fields)
}
func (n *netmon) setupLogger() error {
level, err := logrus.ParseLevel(n.logLevel)
if err != nil {
return err
}
netmonLog.SetLevel(level)
netmonLog.Formatter = &logrus.TextFormatter{TimestampFormat: time.RFC3339Nano}
hook, err := lSyslog.NewSyslogHook("", "", syslog.LOG_INFO|syslog.LOG_USER, netmonName)
if err != nil {
return err
}
netmonLog.AddHook(hook)
announceFields := logrus.Fields{
"runtime-path": n.runtimePath,
"debug": n.debug,
"log-level": n.logLevel,
}
n.logger().WithFields(announceFields).Info("announce")
return nil
}
func (n *netmon) listenNetlinkEvents() error {
if err := netlink.LinkSubscribe(n.linkUpdateCh, n.linkDoneCh); err != nil {
return err
}
return netlink.RouteSubscribe(n.rtUpdateCh, n.rtDoneCh)
}
// convertInterface converts a link and its IP addresses as defined by netlink
// package, into the Interface structure format expected by kata-runtime to
// describe an interface and its associated IP addresses.
func convertInterface(linkAttrs *netlink.LinkAttrs, addrs []netlink.Addr) Interface {
if linkAttrs == nil {
netmonLog.Warn("Link attributes are nil")
return Interface{}
}
var ipAddrs []*IPAddress
for _, addr := range addrs {
if addr.IPNet == nil {
continue
}
netMask, _ := addr.Mask.Size()
ipAddr := &IPAddress{
Family: IPFamily(netlinkFamily),
Address: addr.IP.String(),
Mask: fmt.Sprintf("%d", netMask),
}
ipAddrs = append(ipAddrs, ipAddr)
}
iface := Interface{
Device: linkAttrs.Name,
Name: linkAttrs.Name,
IPAddresses: ipAddrs,
Mtu: uint64(linkAttrs.MTU),
HwAddr: linkAttrs.HardwareAddr.String(),
}
netmonLog.WithField("interface", iface).Debug("Interface converted")
return iface
}
// convertRoutes converts a list of routes as defined by netlink package,
// into a list of Route structure format expected by kata-runtime to
// describe a set of routes.
func convertRoutes(netRoutes []netlink.Route) []Route {
var routes []Route
// Ignore routes with IPv6 addresses as this is not supported
// by Kata yet.
for _, netRoute := range netRoutes {
dst := ""
if netRoute.Dst != nil {
if netRoute.Dst.IP.To4() != nil {
dst = netRoute.Dst.IP.String()
} else {
netmonLog.WithField("destination", netRoute.Dst.IP.String()).Warn("Not IPv4 format")
}
}
src := ""
if netRoute.Src.To4() != nil {
src = netRoute.Src.String()
} else {
netmonLog.WithField("source", netRoute.Src.String()).Warn("Not IPv4 format")
}
gw := ""
if netRoute.Gw.To4() != nil {
gw = netRoute.Gw.String()
} else {
netmonLog.WithField("gateway", netRoute.Gw.String()).Warn("Not IPv4 format")
}
dev := ""
iface, err := net.InterfaceByIndex(netRoute.LinkIndex)
if err == nil {
dev = iface.Name
}
route := Route{
Dest: dst,
Gateway: gw,
Device: dev,
Source: src,
Scope: uint32(netRoute.Scope),
}
routes = append(routes, route)
}
netmonLog.WithField("routes", routes).Debug("Routes converted")
return routes
}
// scanNetwork lists all the interfaces it can find inside the current
// network namespace, and store them in-memory to keep track of them.
func (n *netmon) scanNetwork() error {
links, err := n.netHandler.LinkList()
if err != nil {
return err
}
for _, link := range links {
addrs, err := n.netHandler.AddrList(link, netlinkFamily)
if err != nil {
return err
}
linkAttrs := link.Attrs()
if linkAttrs == nil {
continue
}
iface := convertInterface(linkAttrs, addrs)
n.netIfaces[linkAttrs.Index] = iface
}
n.logger().Debug("Network scanned")
return nil
}
func (n *netmon) storeDataToSend(data interface{}) error {
// Marshal the data structure into a JSON bytes array.
jsonArray, err := json.Marshal(data)
if err != nil {
return err
}
// Store the JSON bytes array at the specified path.
return ioutil.WriteFile(n.sharedFile, jsonArray, storageFilePerm)
}
func (n *netmon) execKataCmd(subCmd string) error {
execCmd := exec.Command(n.runtimePath, kataCmd, subCmd, n.sandboxID, n.sharedFile)
n.logger().WithField("command", execCmd).Debug("Running runtime command")
// Make use of Run() to ensure the kata-runtime process has correctly
// terminated before to go further.
if err := execCmd.Run(); err != nil {
return err
}
// Remove the shared file after the command returned. At this point
// we know the content of the file is not going to be used anymore,
// and the file path can be reused for further commands.
return os.Remove(n.sharedFile)
}
func (n *netmon) addInterfaceCLI(iface Interface) error {
if err := n.storeDataToSend(iface); err != nil {
return err
}
return n.execKataCmd(kataCLIAddIfaceCmd)
}
func (n *netmon) delInterfaceCLI(iface Interface) error {
if err := n.storeDataToSend(iface); err != nil {
return err
}
return n.execKataCmd(kataCLIDelIfaceCmd)
}
func (n *netmon) updateRoutesCLI(routes []Route) error {
if err := n.storeDataToSend(routes); err != nil {
return err
}
return n.execKataCmd(kataCLIUpdtRoutesCmd)
}
func (n *netmon) updateRoutes() error {
// Get all the routes.
netlinkRoutes, err := n.netHandler.RouteList(nil, netlinkFamily)
if err != nil {
return err
}
// Translate them into Route structures.
routes := convertRoutes(netlinkRoutes)
// Update the routes through the Kata CLI.
return n.updateRoutesCLI(routes)
}
func (n *netmon) handleRTMNewAddr(ev netlink.LinkUpdate) error {
n.logger().Debug("Interface update not supported")
return nil
}
func (n *netmon) handleRTMDelAddr(ev netlink.LinkUpdate) error {
n.logger().Debug("Interface update not supported")
return nil
}
func (n *netmon) handleRTMNewLink(ev netlink.LinkUpdate) error {
// NEWLINK might be a lot of different things. We're interested in
// adding the interface (both to our list and by calling into the
// Kata CLI API) only if this has the flags UP and RUNNING, meaning
// we don't expect any further change on the interface, and that we
// are ready to add it.
linkAttrs := ev.Link.Attrs()
if linkAttrs == nil {
n.logger().Warn("The link attributes are nil")
return nil
}
// First, ignore if the interface name contains "kata". This way we
// are preventing from adding interfaces created by Kata Containers.
if strings.HasSuffix(linkAttrs.Name, kataSuffix) {
n.logger().Debugf("Ignore the interface %s because found %q",
linkAttrs.Name, kataSuffix)
return nil
}
// Check if the interface exist in the internal list.
if _, exist := n.netIfaces[int(ev.Index)]; exist {
n.logger().Debugf("Ignoring interface %s because already exist",
linkAttrs.Name)
return nil
}
// Now, check if the interface has been enabled to UP and RUNNING.
if (ev.Flags&unix.IFF_UP) != unix.IFF_UP ||
(ev.Flags&unix.IFF_RUNNING) != unix.IFF_RUNNING {
n.logger().Debugf("Ignore the interface %s because not UP and RUNNING",
linkAttrs.Name)
return nil
}
// Get the list of IP addresses associated with this interface.
addrs, err := n.netHandler.AddrList(ev.Link, netlinkFamily)
if err != nil {
return err
}
// Convert the interfaces in the appropriate structure format.
iface := convertInterface(linkAttrs, addrs)
// Add the interface through the Kata CLI.
if err := n.addInterfaceCLI(iface); err != nil {
return err
}
// Add the interface to the internal list.
n.netIfaces[linkAttrs.Index] = iface
// Complete by updating the routes.
return n.updateRoutes()
}
func (n *netmon) handleRTMDelLink(ev netlink.LinkUpdate) error {
// It can only delete if identical interface is found in the internal
// list of interfaces. Otherwise, the deletion will be ignored.
linkAttrs := ev.Link.Attrs()
if linkAttrs == nil {
n.logger().Warn("Link attributes are nil")
return nil
}
// First, ignore if the interface name contains "kata". This way we
// are preventing from deleting interfaces created by Kata Containers.
if strings.Contains(linkAttrs.Name, kataSuffix) {
n.logger().Debugf("Ignore the interface %s because found %q",
linkAttrs.Name, kataSuffix)
return nil
}
// Check if the interface exist in the internal list.
iface, exist := n.netIfaces[int(ev.Index)]
if !exist {
n.logger().Debugf("Ignoring interface %s because not found",
linkAttrs.Name)
return nil
}
if err := n.delInterfaceCLI(iface); err != nil {
return err
}
// Delete the interface from the internal list.
delete(n.netIfaces, linkAttrs.Index)
// Complete by updating the routes.
return n.updateRoutes()
}
func (n *netmon) handleRTMNewRoute(ev netlink.RouteUpdate) error {
// Add the route through updateRoutes(), only if the route refer to an
// interface that already exists in the internal list of interfaces.
if _, exist := n.netIfaces[ev.Route.LinkIndex]; !exist {
n.logger().Debugf("Ignoring route %+v since interface %d not found",
ev.Route, ev.Route.LinkIndex)
return nil
}
return n.updateRoutes()
}
func (n *netmon) handleRTMDelRoute(ev netlink.RouteUpdate) error {
// Remove the route through updateRoutes(), only if the route refer to
// an interface that already exists in the internal list of interfaces.
return n.updateRoutes()
}
func (n *netmon) handleLinkEvent(ev netlink.LinkUpdate) error {
n.logger().Debug("handleLinkEvent: netlink event received")
switch ev.Header.Type {
case unix.NLMSG_DONE:
n.logger().Debug("NLMSG_DONE")
return nil
case unix.NLMSG_ERROR:
n.logger().Error("NLMSG_ERROR")
return fmt.Errorf("Error while listening on netlink socket")
case unix.RTM_NEWADDR:
n.logger().Debug("RTM_NEWADDR")
return n.handleRTMNewAddr(ev)
case unix.RTM_DELADDR:
n.logger().Debug("RTM_DELADDR")
return n.handleRTMDelAddr(ev)
case unix.RTM_NEWLINK:
n.logger().Debug("RTM_NEWLINK")
return n.handleRTMNewLink(ev)
case unix.RTM_DELLINK:
n.logger().Debug("RTM_DELLINK")
return n.handleRTMDelLink(ev)
default:
n.logger().Warnf("Unknown msg type %v", ev.Header.Type)
}
return nil
}
func (n *netmon) handleRouteEvent(ev netlink.RouteUpdate) error {
n.logger().Debug("handleRouteEvent: netlink event received")
switch ev.Type {
case unix.RTM_NEWROUTE:
n.logger().Debug("RTM_NEWROUTE")
return n.handleRTMNewRoute(ev)
case unix.RTM_DELROUTE:
n.logger().Debug("RTM_DELROUTE")
return n.handleRTMDelRoute(ev)
default:
n.logger().Warnf("Unknown msg type %v", ev.Type)
}
return nil
}
func (n *netmon) handleEvents() (err error) {
for {
select {
case ev := <-n.linkUpdateCh:
if err = n.handleLinkEvent(ev); err != nil {
return err
}
case ev := <-n.rtUpdateCh:
if err = n.handleRouteEvent(ev); err != nil {
return err
}
}
}
}
func main() {
// Parse parameters.
params := parseOptions()
// Create netmon handler.
n, err := newNetmon(params)
if err != nil {
netmonLog.WithError(err).Fatal("newNetmon()")
os.Exit(1)
}
defer n.cleanup()
// Init logger.
if err := n.setupLogger(); err != nil {
netmonLog.WithError(err).Fatal("setupLogger()")
os.Exit(1)
}
// Scan the current interfaces.
if err := n.scanNetwork(); err != nil {
n.logger().WithError(err).Fatal("scanNetwork()")
os.Exit(1)
}
// Subscribe to the link listener.
if err := n.listenNetlinkEvents(); err != nil {
n.logger().WithError(err).Fatal("listenNetlinkEvents()")
os.Exit(1)
}
// Go into the main loop.
if err := n.handleEvents(); err != nil {
n.logger().WithError(err).Fatal("handleEvents()")
os.Exit(1)
}
}