// Copyright (c) 2018 Intel Corporation // // SPDX-License-Identifier: Apache-2.0 // package main import ( "encoding/json" "flag" "fmt" "io/ioutil" "log/syslog" "net" "os" "os/exec" "path/filepath" "strings" "time" "github.com/sirupsen/logrus" lSyslog "github.com/sirupsen/logrus/hooks/syslog" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) // The following types and structures have to be kept in sync with the // description of the agent protocol. Those definitions need to be in their // own separate package so that they can be imported directly from this code. // The reason for not importing them now, is because importing the whole agent // protocol adds up too much overhead because of the grpc protocol involved. // IPFamily define the IP address family type. type IPFamily int32 // IPAddress describes the IP address format expected by Kata API. type IPAddress struct { Family IPFamily `json:"family,omitempty"` Address string `json:"address,omitempty"` Mask string `json:"mask,omitempty"` } // Interface describes the network interface format expected by Kata API. type Interface struct { Device string `json:"device,omitempty"` Name string `json:"name,omitempty"` IPAddresses []*IPAddress `json:"IPAddresses,omitempty"` Mtu uint64 `json:"mtu,omitempty"` HwAddr string `json:"hwAddr,omitempty"` PciAddr string `json:"pciAddr,omitempty"` } // Route describes the network route format expected by Kata API. type Route struct { Dest string `json:"dest,omitempty"` Gateway string `json:"gateway,omitempty"` Device string `json:"device,omitempty"` Source string `json:"source,omitempty"` Scope uint32 `json:"scope,omitempty"` } const ( netmonName = "kata-netmon" netmonVersion = "0.0.1" kataCmd = "kata-network" kataCLIAddIfaceCmd = "add-iface" kataCLIDelIfaceCmd = "del-iface" kataCLIUpdtRoutesCmd = "update-routes" kataSuffix = "kata" // For simplicity the code will only focus on IPv4 addresses for now. netlinkFamily = netlink.FAMILY_V4 storageParentPath = "/var/run/kata-containers/netmon/sbs" storageDirPerm = os.FileMode(0750) // sharedFile is the name of the file that will be used to share // the data between this process and the kata-runtime process // responsible for updating the network. sharedFile = "shared.json" storageFilePerm = os.FileMode(0640) ) type netmonParams struct { sandboxID string runtimePath string debug bool logLevel string } type netmon struct { netmonParams storagePath string sharedFile string netIfaces map[int]Interface linkUpdateCh chan netlink.LinkUpdate linkDoneCh chan struct{} rtUpdateCh chan netlink.RouteUpdate rtDoneCh chan struct{} netHandler *netlink.Handle } var netmonLog = logrus.New() func printVersion() { fmt.Printf("%s version %s\n", netmonName, netmonVersion) } const componentDescription = `is a network monitoring process that is intended to be started in the appropriate network namespace so that it can listen to any event related to link and routes. Whenever a new interface or route is created/updated, it is responsible for calling into the kata-runtime CLI to ask for the actual creation/update of the given interface or route. ` func printComponentDescription() { fmt.Printf("\n%s %s\n", netmonName, componentDescription) } func parseOptions() netmonParams { var version, help bool params := netmonParams{} flag.BoolVar(&help, "h", false, "describe component usage") flag.BoolVar(&help, "help", false, "") flag.BoolVar(¶ms.debug, "d", false, "enable debug mode") flag.BoolVar(&version, "v", false, "display program version and exit") flag.BoolVar(&version, "version", false, "") flag.StringVar(¶ms.sandboxID, "s", "", "sandbox id (required)") flag.StringVar(¶ms.runtimePath, "r", "", "runtime path (required)") flag.StringVar(¶ms.logLevel, "log", "warn", "log messages above specified level: debug, warn, error, fatal or panic") flag.Parse() if help { printComponentDescription() flag.PrintDefaults() os.Exit(0) } if version { printVersion() os.Exit(0) } if params.sandboxID == "" { fmt.Fprintf(os.Stderr, "Error: sandbox id is empty, one must be provided\n") flag.PrintDefaults() os.Exit(1) } if params.runtimePath == "" { fmt.Fprintf(os.Stderr, "Error: runtime path is empty, one must be provided\n") flag.PrintDefaults() os.Exit(1) } return params } func newNetmon(params netmonParams) (*netmon, error) { handler, err := netlink.NewHandle(netlinkFamily) if err != nil { return nil, err } n := &netmon{ netmonParams: params, storagePath: filepath.Join(storageParentPath, params.sandboxID), sharedFile: filepath.Join(storageParentPath, params.sandboxID, sharedFile), netIfaces: make(map[int]Interface), linkUpdateCh: make(chan netlink.LinkUpdate), linkDoneCh: make(chan struct{}), rtUpdateCh: make(chan netlink.RouteUpdate), rtDoneCh: make(chan struct{}), netHandler: handler, } if err := os.MkdirAll(n.storagePath, storageDirPerm); err != nil { return nil, err } return n, nil } func (n *netmon) cleanup() { os.RemoveAll(n.storagePath) n.netHandler.Delete() close(n.linkDoneCh) close(n.rtDoneCh) } func (n *netmon) logger() *logrus.Entry { fields := logrus.Fields{ "name": netmonName, "pid": os.Getpid(), "source": "netmon", } if n.sandboxID != "" { fields["sandbox"] = n.sandboxID } return netmonLog.WithFields(fields) } func (n *netmon) setupLogger() error { level, err := logrus.ParseLevel(n.logLevel) if err != nil { return err } netmonLog.SetLevel(level) netmonLog.Formatter = &logrus.TextFormatter{TimestampFormat: time.RFC3339Nano} hook, err := lSyslog.NewSyslogHook("", "", syslog.LOG_INFO|syslog.LOG_USER, netmonName) if err != nil { return err } netmonLog.AddHook(hook) announceFields := logrus.Fields{ "runtime-path": n.runtimePath, "debug": n.debug, "log-level": n.logLevel, } n.logger().WithFields(announceFields).Info("announce") return nil } func (n *netmon) listenNetlinkEvents() error { if err := netlink.LinkSubscribe(n.linkUpdateCh, n.linkDoneCh); err != nil { return err } return netlink.RouteSubscribe(n.rtUpdateCh, n.rtDoneCh) } // convertInterface converts a link and its IP addresses as defined by netlink // package, into the Interface structure format expected by kata-runtime to // describe an interface and its associated IP addresses. func convertInterface(linkAttrs *netlink.LinkAttrs, addrs []netlink.Addr) Interface { if linkAttrs == nil { netmonLog.Warn("Link attributes are nil") return Interface{} } var ipAddrs []*IPAddress for _, addr := range addrs { if addr.IPNet == nil { continue } netMask, _ := addr.Mask.Size() ipAddr := &IPAddress{ Family: IPFamily(netlinkFamily), Address: addr.IP.String(), Mask: fmt.Sprintf("%d", netMask), } ipAddrs = append(ipAddrs, ipAddr) } iface := Interface{ Device: linkAttrs.Name, Name: linkAttrs.Name, IPAddresses: ipAddrs, Mtu: uint64(linkAttrs.MTU), HwAddr: linkAttrs.HardwareAddr.String(), } netmonLog.WithField("interface", iface).Debug("Interface converted") return iface } // convertRoutes converts a list of routes as defined by netlink package, // into a list of Route structure format expected by kata-runtime to // describe a set of routes. func convertRoutes(netRoutes []netlink.Route) []Route { var routes []Route // Ignore routes with IPv6 addresses as this is not supported // by Kata yet. for _, netRoute := range netRoutes { dst := "" if netRoute.Dst != nil { if netRoute.Dst.IP.To4() != nil { dst = netRoute.Dst.IP.String() } else { netmonLog.WithField("destination", netRoute.Dst.IP.String()).Warn("Not IPv4 format") } } src := "" if netRoute.Src.To4() != nil { src = netRoute.Src.String() } else { netmonLog.WithField("source", netRoute.Src.String()).Warn("Not IPv4 format") } gw := "" if netRoute.Gw.To4() != nil { gw = netRoute.Gw.String() } else { netmonLog.WithField("gateway", netRoute.Gw.String()).Warn("Not IPv4 format") } dev := "" iface, err := net.InterfaceByIndex(netRoute.LinkIndex) if err == nil { dev = iface.Name } route := Route{ Dest: dst, Gateway: gw, Device: dev, Source: src, Scope: uint32(netRoute.Scope), } routes = append(routes, route) } netmonLog.WithField("routes", routes).Debug("Routes converted") return routes } // scanNetwork lists all the interfaces it can find inside the current // network namespace, and store them in-memory to keep track of them. func (n *netmon) scanNetwork() error { links, err := n.netHandler.LinkList() if err != nil { return err } for _, link := range links { addrs, err := n.netHandler.AddrList(link, netlinkFamily) if err != nil { return err } linkAttrs := link.Attrs() if linkAttrs == nil { continue } iface := convertInterface(linkAttrs, addrs) n.netIfaces[linkAttrs.Index] = iface } n.logger().Debug("Network scanned") return nil } func (n *netmon) storeDataToSend(data interface{}) error { // Marshal the data structure into a JSON bytes array. jsonArray, err := json.Marshal(data) if err != nil { return err } // Store the JSON bytes array at the specified path. return ioutil.WriteFile(n.sharedFile, jsonArray, storageFilePerm) } func (n *netmon) execKataCmd(subCmd string) error { execCmd := exec.Command(n.runtimePath, kataCmd, subCmd, n.sandboxID, n.sharedFile) n.logger().WithField("command", execCmd).Debug("Running runtime command") // Make use of Run() to ensure the kata-runtime process has correctly // terminated before to go further. if err := execCmd.Run(); err != nil { return err } // Remove the shared file after the command returned. At this point // we know the content of the file is not going to be used anymore, // and the file path can be reused for further commands. return os.Remove(n.sharedFile) } func (n *netmon) addInterfaceCLI(iface Interface) error { if err := n.storeDataToSend(iface); err != nil { return err } return n.execKataCmd(kataCLIAddIfaceCmd) } func (n *netmon) delInterfaceCLI(iface Interface) error { if err := n.storeDataToSend(iface); err != nil { return err } return n.execKataCmd(kataCLIDelIfaceCmd) } func (n *netmon) updateRoutesCLI(routes []Route) error { if err := n.storeDataToSend(routes); err != nil { return err } return n.execKataCmd(kataCLIUpdtRoutesCmd) } func (n *netmon) updateRoutes() error { // Get all the routes. netlinkRoutes, err := n.netHandler.RouteList(nil, netlinkFamily) if err != nil { return err } // Translate them into Route structures. routes := convertRoutes(netlinkRoutes) // Update the routes through the Kata CLI. return n.updateRoutesCLI(routes) } func (n *netmon) handleRTMNewAddr(ev netlink.LinkUpdate) error { n.logger().Debug("Interface update not supported") return nil } func (n *netmon) handleRTMDelAddr(ev netlink.LinkUpdate) error { n.logger().Debug("Interface update not supported") return nil } func (n *netmon) handleRTMNewLink(ev netlink.LinkUpdate) error { // NEWLINK might be a lot of different things. We're interested in // adding the interface (both to our list and by calling into the // Kata CLI API) only if this has the flags UP and RUNNING, meaning // we don't expect any further change on the interface, and that we // are ready to add it. linkAttrs := ev.Link.Attrs() if linkAttrs == nil { n.logger().Warn("The link attributes are nil") return nil } // First, ignore if the interface name contains "kata". This way we // are preventing from adding interfaces created by Kata Containers. if strings.HasSuffix(linkAttrs.Name, kataSuffix) { n.logger().Debugf("Ignore the interface %s because found %q", linkAttrs.Name, kataSuffix) return nil } // Check if the interface exist in the internal list. if _, exist := n.netIfaces[int(ev.Index)]; exist { n.logger().Debugf("Ignoring interface %s because already exist", linkAttrs.Name) return nil } // Now, check if the interface has been enabled to UP and RUNNING. if (ev.Flags&unix.IFF_UP) != unix.IFF_UP || (ev.Flags&unix.IFF_RUNNING) != unix.IFF_RUNNING { n.logger().Debugf("Ignore the interface %s because not UP and RUNNING", linkAttrs.Name) return nil } // Get the list of IP addresses associated with this interface. addrs, err := n.netHandler.AddrList(ev.Link, netlinkFamily) if err != nil { return err } // Convert the interfaces in the appropriate structure format. iface := convertInterface(linkAttrs, addrs) // Add the interface through the Kata CLI. if err := n.addInterfaceCLI(iface); err != nil { return err } // Add the interface to the internal list. n.netIfaces[linkAttrs.Index] = iface // Complete by updating the routes. return n.updateRoutes() } func (n *netmon) handleRTMDelLink(ev netlink.LinkUpdate) error { // It can only delete if identical interface is found in the internal // list of interfaces. Otherwise, the deletion will be ignored. linkAttrs := ev.Link.Attrs() if linkAttrs == nil { n.logger().Warn("Link attributes are nil") return nil } // First, ignore if the interface name contains "kata". This way we // are preventing from deleting interfaces created by Kata Containers. if strings.Contains(linkAttrs.Name, kataSuffix) { n.logger().Debugf("Ignore the interface %s because found %q", linkAttrs.Name, kataSuffix) return nil } // Check if the interface exist in the internal list. iface, exist := n.netIfaces[int(ev.Index)] if !exist { n.logger().Debugf("Ignoring interface %s because not found", linkAttrs.Name) return nil } if err := n.delInterfaceCLI(iface); err != nil { return err } // Delete the interface from the internal list. delete(n.netIfaces, linkAttrs.Index) // Complete by updating the routes. return n.updateRoutes() } func (n *netmon) handleRTMNewRoute(ev netlink.RouteUpdate) error { // Add the route through updateRoutes(), only if the route refer to an // interface that already exists in the internal list of interfaces. if _, exist := n.netIfaces[ev.Route.LinkIndex]; !exist { n.logger().Debugf("Ignoring route %+v since interface %d not found", ev.Route, ev.Route.LinkIndex) return nil } return n.updateRoutes() } func (n *netmon) handleRTMDelRoute(ev netlink.RouteUpdate) error { // Remove the route through updateRoutes(), only if the route refer to // an interface that already exists in the internal list of interfaces. return n.updateRoutes() } func (n *netmon) handleLinkEvent(ev netlink.LinkUpdate) error { n.logger().Debug("handleLinkEvent: netlink event received") switch ev.Header.Type { case unix.NLMSG_DONE: n.logger().Debug("NLMSG_DONE") return nil case unix.NLMSG_ERROR: n.logger().Error("NLMSG_ERROR") return fmt.Errorf("Error while listening on netlink socket") case unix.RTM_NEWADDR: n.logger().Debug("RTM_NEWADDR") return n.handleRTMNewAddr(ev) case unix.RTM_DELADDR: n.logger().Debug("RTM_DELADDR") return n.handleRTMDelAddr(ev) case unix.RTM_NEWLINK: n.logger().Debug("RTM_NEWLINK") return n.handleRTMNewLink(ev) case unix.RTM_DELLINK: n.logger().Debug("RTM_DELLINK") return n.handleRTMDelLink(ev) default: n.logger().Warnf("Unknown msg type %v", ev.Header.Type) } return nil } func (n *netmon) handleRouteEvent(ev netlink.RouteUpdate) error { n.logger().Debug("handleRouteEvent: netlink event received") switch ev.Type { case unix.RTM_NEWROUTE: n.logger().Debug("RTM_NEWROUTE") return n.handleRTMNewRoute(ev) case unix.RTM_DELROUTE: n.logger().Debug("RTM_DELROUTE") return n.handleRTMDelRoute(ev) default: n.logger().Warnf("Unknown msg type %v", ev.Type) } return nil } func (n *netmon) handleEvents() (err error) { for { select { case ev := <-n.linkUpdateCh: if err = n.handleLinkEvent(ev); err != nil { return err } case ev := <-n.rtUpdateCh: if err = n.handleRouteEvent(ev); err != nil { return err } } } } func main() { // Parse parameters. params := parseOptions() // Create netmon handler. n, err := newNetmon(params) if err != nil { netmonLog.WithError(err).Fatal("newNetmon()") os.Exit(1) } defer n.cleanup() // Init logger. if err := n.setupLogger(); err != nil { netmonLog.WithError(err).Fatal("setupLogger()") os.Exit(1) } // Scan the current interfaces. if err := n.scanNetwork(); err != nil { n.logger().WithError(err).Fatal("scanNetwork()") os.Exit(1) } // Subscribe to the link listener. if err := n.listenNetlinkEvents(); err != nil { n.logger().WithError(err).Fatal("listenNetlinkEvents()") os.Exit(1) } // Go into the main loop. if err := n.handleEvents(); err != nil { n.logger().WithError(err).Fatal("handleEvents()") os.Exit(1) } }