mirror of
https://github.com/aljazceru/kata-containers.git
synced 2025-12-19 07:14:22 +01:00
tools: Add Unix socket support to agentl-ctl
Rather than specifying the VSOCK address as two CLI options (`--vsock-cid` and `--vsock-port`), allow the agent's ttRPC server address to be specified to the `agent-ctl` tool using a single URI `--server-address` CLI option. Since the ttrpc crate supports VSOCK and UNIX schemes, this allows the tool to be run inside the VM by specifying a UNIX address. Fixes: #549. Signed-off-by: James O. D. Hunt <james.o.hunt@intel.com>
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
use crate::types::{Config, Options};
|
||||
use crate::utils;
|
||||
use anyhow::{anyhow, Result};
|
||||
use nix::sys::socket::{connect, socket, AddressFamily, SockAddr, SockFlag, SockType};
|
||||
use nix::sys::socket::{connect, socket, AddressFamily, SockAddr, SockFlag, SockType, UnixAddr};
|
||||
use protocols::agent::*;
|
||||
use protocols::agent_ttrpc::*;
|
||||
use protocols::health::*;
|
||||
@@ -16,7 +16,8 @@ use protocols::health_ttrpc::*;
|
||||
use slog::{debug, info};
|
||||
use std::io;
|
||||
use std::io::Write; // XXX: for flush()
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::os::unix::io::{IntoRawFd, RawFd};
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use ttrpc;
|
||||
@@ -294,25 +295,123 @@ fn client_create_vsock_fd(cid: libc::c_uint, port: u32) -> Result<RawFd> {
|
||||
Ok(fd)
|
||||
}
|
||||
|
||||
fn create_ttrpc_client(cid: libc::c_uint, port: u32) -> Result<ttrpc::Client> {
|
||||
let fd = client_create_vsock_fd(cid, port).map_err(|e| {
|
||||
anyhow!(format!(
|
||||
"failed to create VSOCK connection (check agent is running): {:?}",
|
||||
e
|
||||
))
|
||||
})?;
|
||||
fn create_ttrpc_client(server_address: String) -> Result<ttrpc::Client> {
|
||||
if server_address == "" {
|
||||
return Err(anyhow!("server address cannot be blank"));
|
||||
}
|
||||
|
||||
let fields: Vec<&str> = server_address.split("://").collect();
|
||||
|
||||
if fields.len() != 2 {
|
||||
return Err(anyhow!("invalid server address URI"));
|
||||
}
|
||||
|
||||
let scheme = fields[0].to_lowercase();
|
||||
|
||||
let fd: RawFd = match scheme.as_str() {
|
||||
// Formats:
|
||||
//
|
||||
// - "unix://absolute-path" (domain socket)
|
||||
// (example: "unix:///tmp/domain.socket")
|
||||
//
|
||||
// - "unix://@absolute-path" (abstract socket)
|
||||
// (example: "unix://@/tmp/abstract.socket")
|
||||
//
|
||||
"unix" => {
|
||||
let mut abstract_socket = false;
|
||||
|
||||
let mut path = fields[1].to_string();
|
||||
|
||||
if path.starts_with('@') {
|
||||
abstract_socket = true;
|
||||
|
||||
// Remove the magic abstract-socket request character ('@')
|
||||
// and crucially add a trailing nul terminator (required to
|
||||
// interoperate with the ttrpc crate).
|
||||
path = path[1..].to_string() + &"\x00".to_string();
|
||||
}
|
||||
|
||||
if abstract_socket {
|
||||
let socket_fd = match socket(
|
||||
AddressFamily::Unix,
|
||||
SockType::Stream,
|
||||
SockFlag::empty(),
|
||||
None,
|
||||
) {
|
||||
Ok(s) => s,
|
||||
Err(e) => return Err(anyhow!(e).context("Failed to create Unix Domain socket")),
|
||||
};
|
||||
|
||||
let unix_addr = match UnixAddr::new_abstract(path.as_bytes()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
return Err(
|
||||
anyhow!(e).context("Failed to create Unix Domain abstract socket")
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let sock_addr = SockAddr::Unix(unix_addr);
|
||||
|
||||
connect(socket_fd, &sock_addr).map_err(|e| {
|
||||
anyhow!(e).context("Failed to connect to Unix Domain abstract socket")
|
||||
})?;
|
||||
|
||||
socket_fd
|
||||
} else {
|
||||
let stream = match UnixStream::connect(path) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
return Err(
|
||||
anyhow!(e).context("failed to create named UNIX Domain stream socket")
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
stream.into_raw_fd()
|
||||
}
|
||||
}
|
||||
// Format: "vsock://cid:port"
|
||||
"vsock" => {
|
||||
let addr: Vec<&str> = fields[1].split(':').collect();
|
||||
|
||||
if addr.len() != 2 {
|
||||
return Err(anyhow!("invalid VSOCK server address URI"));
|
||||
}
|
||||
|
||||
let cid: u32 = match addr[0] {
|
||||
"-1" | "" => libc::VMADDR_CID_ANY,
|
||||
_ => match addr[0].parse::<u32>() {
|
||||
Ok(c) => c,
|
||||
Err(e) => return Err(anyhow!(e).context("VSOCK CID is not numeric")),
|
||||
},
|
||||
};
|
||||
|
||||
let port: u32 = match addr[1].parse::<u32>() {
|
||||
Ok(r) => r,
|
||||
Err(e) => return Err(anyhow!(e).context("VSOCK port is not numeric")),
|
||||
};
|
||||
|
||||
client_create_vsock_fd(cid, port).map_err(|e| {
|
||||
anyhow!(e).context("failed to create VSOCK connection (check agent is running)")
|
||||
})?
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow!("invalid server address URI scheme: {:?}", scheme));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(ttrpc::client::Client::new(fd))
|
||||
}
|
||||
|
||||
fn kata_service_agent(cid: libc::c_uint, port: u32) -> Result<AgentServiceClient> {
|
||||
let ttrpc_client = create_ttrpc_client(cid, port)?;
|
||||
fn kata_service_agent(server_address: String) -> Result<AgentServiceClient> {
|
||||
let ttrpc_client = create_ttrpc_client(server_address)?;
|
||||
|
||||
Ok(AgentServiceClient::new(ttrpc_client))
|
||||
}
|
||||
|
||||
fn kata_service_health(cid: libc::c_uint, port: u32) -> Result<HealthClient> {
|
||||
let ttrpc_client = create_ttrpc_client(cid, port)?;
|
||||
fn kata_service_health(server_address: String) -> Result<HealthClient> {
|
||||
let ttrpc_client = create_ttrpc_client(server_address)?;
|
||||
|
||||
Ok(HealthClient::new(ttrpc_client))
|
||||
}
|
||||
@@ -344,15 +443,10 @@ pub fn client(cfg: &Config, commands: Vec<&str>) -> Result<()> {
|
||||
|
||||
announce(cfg);
|
||||
|
||||
let cid = cfg.cid;
|
||||
let port = cfg.port;
|
||||
|
||||
let addr = format!("vsock://{}:{}", cid, port);
|
||||
|
||||
// Create separate connections for each of the services provided
|
||||
// by the agent.
|
||||
let client = kata_service_agent(cid, port as u32)?;
|
||||
let health = kata_service_health(cid, port as u32)?;
|
||||
let client = kata_service_agent(cfg.server_address.clone())?;
|
||||
let health = kata_service_health(cfg.server_address.clone())?;
|
||||
|
||||
let mut options = Options::new();
|
||||
|
||||
@@ -365,7 +459,7 @@ pub fn client(cfg: &Config, commands: Vec<&str>) -> Result<()> {
|
||||
options.insert("bundle-dir".to_string(), cfg.bundle_dir.clone());
|
||||
|
||||
info!(sl!(), "client setup complete";
|
||||
"server-address" => addr);
|
||||
"server-address" => cfg.server_address.to_string());
|
||||
|
||||
if cfg.interactive {
|
||||
return interactive_client_loop(&cfg, &mut options, &client, &health);
|
||||
@@ -533,8 +627,8 @@ fn interactive_client_loop(
|
||||
let mut repeat_count: i64 = 1;
|
||||
|
||||
loop {
|
||||
let cmdline = readline("Enter command")
|
||||
.map_err(|e| anyhow!(format!("failed to read line: {}", e)))?;
|
||||
let cmdline =
|
||||
readline("Enter command").map_err(|e| anyhow!(e).context("failed to read line"))?;
|
||||
|
||||
if cmdline == "" {
|
||||
continue;
|
||||
@@ -592,7 +686,7 @@ fn agent_cmd_health_check(
|
||||
|
||||
let reply = health
|
||||
.check(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -615,7 +709,7 @@ fn agent_cmd_health_version(
|
||||
|
||||
let reply = health
|
||||
.version(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -637,7 +731,7 @@ fn agent_cmd_sandbox_create(
|
||||
|
||||
let reply = client
|
||||
.create_sandbox(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -656,7 +750,7 @@ fn agent_cmd_sandbox_destroy(
|
||||
|
||||
let reply = client
|
||||
.destroy_sandbox(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -686,7 +780,7 @@ fn agent_cmd_container_create(
|
||||
|
||||
let reply = client
|
||||
.create_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -709,7 +803,7 @@ fn agent_cmd_container_remove(
|
||||
|
||||
let reply = client
|
||||
.remove_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -746,7 +840,7 @@ fn agent_cmd_container_exec(
|
||||
|
||||
let reply = client
|
||||
.exec_process(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -769,7 +863,7 @@ fn agent_cmd_container_stats(
|
||||
|
||||
let reply = client
|
||||
.stats_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -792,7 +886,7 @@ fn agent_cmd_container_pause(
|
||||
|
||||
let reply = client
|
||||
.pause_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -815,7 +909,7 @@ fn agent_cmd_container_resume(
|
||||
|
||||
let reply = client
|
||||
.resume_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -838,7 +932,7 @@ fn agent_cmd_container_start(
|
||||
|
||||
let reply = client
|
||||
.start_container(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -859,7 +953,7 @@ fn agent_cmd_sandbox_guest_details(
|
||||
|
||||
let reply = client
|
||||
.get_guest_details(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -889,7 +983,7 @@ fn agent_cmd_container_list_processes(
|
||||
|
||||
let reply = client
|
||||
.list_processes(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -914,7 +1008,7 @@ fn agent_cmd_container_wait_process(
|
||||
|
||||
let reply = client
|
||||
.wait_process(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -949,7 +1043,7 @@ fn agent_cmd_container_signal_process(
|
||||
|
||||
let reply = client
|
||||
.signal_process(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -968,7 +1062,7 @@ fn agent_cmd_sandbox_tracing_start(
|
||||
|
||||
let reply = client
|
||||
.start_tracing(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -987,7 +1081,7 @@ fn agent_cmd_sandbox_tracing_stop(
|
||||
|
||||
let reply = client
|
||||
.stop_tracing(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -1006,7 +1100,7 @@ fn agent_cmd_sandbox_update_interface(
|
||||
|
||||
let reply = client
|
||||
.update_interface(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
// FIXME: Implement 'UpdateInterface' fully.
|
||||
eprintln!("FIXME: 'UpdateInterface' not fully implemented");
|
||||
@@ -1031,7 +1125,7 @@ fn agent_cmd_sandbox_update_routes(
|
||||
|
||||
let reply = client
|
||||
.update_routes(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
// FIXME: Implement 'UpdateRoutes' fully.
|
||||
eprintln!("FIXME: 'UpdateRoutes' not fully implemented");
|
||||
@@ -1056,7 +1150,7 @@ fn agent_cmd_sandbox_list_interfaces(
|
||||
|
||||
let reply = client
|
||||
.list_interfaces(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
@@ -1075,7 +1169,7 @@ fn agent_cmd_sandbox_list_routes(
|
||||
|
||||
let reply = client
|
||||
.list_routes(&req, cfg.timeout_nano)
|
||||
.map_err(|e| anyhow!(format!("{}: {:?}", ERR_API_FAILED, e)))?;
|
||||
.map_err(|e| anyhow!("{:?}", e).context(ERR_API_FAILED))?;
|
||||
|
||||
info!(sl!(), "response received";
|
||||
"response" => format!("{:?}", reply));
|
||||
|
||||
Reference in New Issue
Block a user