runtime-rs: enhance runtimes

1. support oom event
2. use ContainerProcess to store container_id and exec_id
3. support stats

Fixes: #3785
Signed-off-by: Zhongtao Hu <zhongtaohu.tim@linux.alibaba.com>
This commit is contained in:
Zhongtao Hu
2022-03-26 17:33:41 +08:00
committed by Fupan Li
parent 9887272db9
commit 10343b1f3d
23 changed files with 674 additions and 175 deletions

View File

@@ -60,10 +60,6 @@ impl ShimExecutor {
data.parse::<u32>().context(Error::ParsePid)
}
pub(crate) fn get_bundle_path(&self) -> Result<PathBuf> {
std::env::current_dir().context(Error::GetBundlePath)
}
pub(crate) fn socket_address(&self, id: &str) -> Result<PathBuf> {
if id.is_empty() {
return Err(anyhow!(Error::EmptySandboxId));
@@ -72,8 +68,6 @@ impl ShimExecutor {
let data = [&self.args.address, &self.args.namespace, id].join("/");
let mut hasher = sha2::Sha256::new();
hasher.update(data);
// Follow
// https://github.com/containerd/containerd/blob/main/runtime/v2/shim/util_unix.go#L68 to
// generate a shim socket path.
Ok(PathBuf::from(format!(
@@ -89,6 +83,8 @@ mod tests {
use super::*;
use serial_test::serial;
use kata_sys_util::spec::get_bundle_path;
#[test]
#[serial]
fn test_shim_executor() {
@@ -111,7 +107,7 @@ mod tests {
executor
.write_address(bundle_path, Path::new("12345"))
.unwrap();
let dir = executor.get_bundle_path().unwrap();
let dir = get_bundle_path().unwrap();
let file_path = &dir.join("address");
let buf = std::fs::read_to_string(file_path).unwrap();
assert_eq!(&buf, "12345");

View File

@@ -7,6 +7,7 @@
use anyhow::{Context, Result};
use containerd_shim_protos::api;
use protobuf::Message;
use std::{fs, path::Path};
use crate::{shim::ShimExecutor, Error};
@@ -30,6 +31,16 @@ impl ShimExecutor {
exited_time.set_seconds(seconds);
rsp.set_exited_at(exited_time);
let address = self
.socket_address(&self.args.id)
.context("socket address")?;
let trim_path = address.strip_prefix("unix://").context("trim path")?;
let file_path = Path::new("/").join(trim_path);
let file_path = file_path.as_path();
if std::fs::metadata(&file_path).is_ok() {
info!(sl!(), "remote socket path: {:?}", &file_path);
fs::remove_file(file_path).ok();
}
service::ServiceManager::cleanup(&self.args.id).context("cleanup")?;
Ok(rsp)
}

View File

@@ -7,6 +7,7 @@
use std::os::unix::io::RawFd;
use anyhow::{Context, Result};
use kata_sys_util::spec::get_bundle_path;
use crate::{
logger,
@@ -18,7 +19,7 @@ impl ShimExecutor {
pub async fn run(&mut self) -> Result<()> {
crate::panic_hook::set_panic_hook();
let sid = self.args.id.clone();
let bundle_path = self.get_bundle_path().context("get bundle")?;
let bundle_path = get_bundle_path().context("get bundle")?;
let path = bundle_path.join("log");
let _logger_guard =
logger::set_logger(path.to_str().unwrap(), &sid, self.args.debug).context("set logger");
@@ -36,12 +37,18 @@ impl ShimExecutor {
async fn do_run(&mut self) -> Result<()> {
info!(sl!(), "start to run");
self.args.validate(false).context("validata")?;
self.args.validate(false).context("validate")?;
let server_fd = get_server_fd().context("get server fd")?;
let mut service_manager = service::ServiceManager::new(&self.args.id, server_fd)
.await
.context("new runtime server")?;
let mut service_manager = service::ServiceManager::new(
&self.args.id,
&self.args.publish_binary,
&self.args.address,
&self.args.namespace,
server_fd,
)
.await
.context("new shim server")?;
service_manager.run().await.context("run")?;
Ok(())

View File

@@ -12,6 +12,7 @@ use std::{
};
use anyhow::{anyhow, Context, Result};
use kata_sys_util::spec::get_bundle_path;
use kata_types::{container::ContainerType, k8s};
use unix_socket::UnixListener;
@@ -32,7 +33,7 @@ impl ShimExecutor {
}
fn do_start(&mut self) -> Result<PathBuf> {
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let spec = self.load_oci_spec(&bundle_path)?;
let (container_type, id) = k8s::container_type_with_id(&spec);
@@ -66,7 +67,7 @@ impl ShimExecutor {
return Err(anyhow!("invalid param"));
}
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let self_exec = std::env::current_exe().map_err(Error::SelfExec)?;
let mut command = std::process::Command::new(self_exec);
@@ -109,7 +110,7 @@ impl ShimExecutor {
fn get_shim_info_from_sandbox(&self, sandbox_id: &str) -> Result<(PathBuf, u32)> {
// All containers of a pod share the same pod socket address.
let address = self.socket_address(sandbox_id).context("socket address")?;
let bundle_path = self.get_bundle_path().context("get bundle path")?;
let bundle_path = get_bundle_path().context("get bundle path")?;
let parent_bundle_path = Path::new(&bundle_path)
.parent()
.unwrap_or_else(|| Path::new(""));
@@ -165,19 +166,13 @@ mod tests {
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 8);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap());
executor.args.debug = true;
let cmd = executor.new_command().unwrap();
assert_eq!(cmd.get_args().len(), 9);
assert_eq!(cmd.get_envs().len(), 1);
assert_eq!(
cmd.get_current_dir().unwrap(),
executor.get_bundle_path().unwrap()
);
assert_eq!(cmd.get_current_dir().unwrap(), get_bundle_path().unwrap());
}
#[test]