runtime-rs: service and runtime framework

1. service: Responsible for processing services, such as task service, image service
2. Responsible for implementing different runtimes, such as Virt-container,
Linux-container, Wasm-container

Fixes: #3785
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou
2021-12-03 18:53:48 +08:00
committed by Fupan Li
parent 4296e3069f
commit bdfee005fa
37 changed files with 1713 additions and 83 deletions

View File

@@ -0,0 +1,18 @@
[package]
name = "service"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dependencies]
anyhow = "^1.0"
async-trait = "0.1.48"
slog = "2.5.2"
slog-scope = "4.4.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
ttrpc = { version = "0.6.0" }
common = { path = "../runtimes/common" }
containerd-shim-protos = { version = "0.2.0", features = ["async"]}
logging = { path = "../../../libs/logging"}
runtimes = { path = "../runtimes" }

View File

@@ -0,0 +1,14 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
logging::logger_with_subsystem!(sl, "service");
mod manager;
pub use manager::ServiceManager;
mod task_service;

View File

@@ -0,0 +1,107 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
os::unix::io::{FromRawFd, RawFd},
sync::Arc,
};
use anyhow::{Context, Result};
use common::message::{Action, Message};
use containerd_shim_protos::shim_async;
use runtimes::RuntimeHandlerManager;
use tokio::sync::mpsc::{channel, Receiver};
use ttrpc::asynchronous::Server;
use crate::task_service::TaskService;
/// message buffer size
const MESSAGE_BUFFER_SIZE: usize = 8;
pub struct ServiceManager {
receiver: Option<Receiver<Message>>,
handler: Arc<RuntimeHandlerManager>,
task_server: Option<Server>,
}
impl ServiceManager {
pub async fn new(id: &str, task_server_fd: RawFd) -> Result<Self> {
let (sender, receiver) = channel::<Message>(MESSAGE_BUFFER_SIZE);
let handler = Arc::new(
RuntimeHandlerManager::new(id, sender)
.await
.context("new runtime handler")?,
);
let mut task_server = unsafe { Server::from_raw_fd(task_server_fd) };
task_server = task_server.set_domain_unix();
Ok(Self {
receiver: Some(receiver),
handler,
task_server: Some(task_server),
})
}
pub async fn run(&mut self) -> Result<()> {
info!(sl!(), "begin to run service");
self.start().await.context("start")?;
let mut rx = self.receiver.take();
if let Some(rx) = rx.as_mut() {
while let Some(r) = rx.recv().await {
info!(sl!(), "receive action {:?}", &r.action);
let result = match r.action {
Action::Start => self.start().await.context("start listen"),
Action::Stop => self.stop_listen().await.context("stop listen"),
Action::Shutdown => {
self.stop_listen().await.context("stop listen")?;
break;
}
};
if let Some(ref sender) = r.resp_sender {
sender.send(result).await.context("send response")?;
}
}
}
info!(sl!(), "end to run service");
Ok(())
}
pub fn cleanup(id: &str) -> Result<()> {
RuntimeHandlerManager::cleanup(id)
}
async fn start(&mut self) -> Result<()> {
let task_service = Arc::new(Box::new(TaskService::new(self.handler.clone()))
as Box<dyn shim_async::Task + Send + Sync>);
let task_server = self.task_server.take();
let task_server = match task_server {
Some(t) => {
let mut t = t.register_service(shim_async::create_task(task_service));
t.start().await.context("task server start")?;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(())
}
async fn stop_listen(&mut self) -> Result<()> {
let task_server = self.task_server.take();
let task_server = match task_server {
Some(mut t) => {
t.stop_listen().await;
Some(t)
}
None => None,
};
self.task_server = task_server;
Ok(())
}
}

View File

@@ -0,0 +1,82 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use async_trait::async_trait;
use common::types::{Request, Response};
use containerd_shim_protos::{api, shim_async};
use ttrpc::{self, r#async::TtrpcContext};
use runtimes::RuntimeHandlerManager;
pub(crate) struct TaskService {
handler: Arc<RuntimeHandlerManager>,
}
impl TaskService {
pub(crate) fn new(handler: Arc<RuntimeHandlerManager>) -> Self {
Self { handler }
}
}
async fn handler_message<TtrpcReq, TtrpcResp>(
s: &RuntimeHandlerManager,
ctx: &TtrpcContext,
req: TtrpcReq,
) -> ttrpc::Result<TtrpcResp>
where
Request: TryFrom<TtrpcReq>,
<Request as TryFrom<TtrpcReq>>::Error: std::fmt::Debug,
TtrpcResp: TryFrom<Response>,
<TtrpcResp as TryFrom<Response>>::Error: std::fmt::Debug,
{
let r = req
.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate from shim {:?}", err)))?;
let logger = sl!().new(o!("steam id" => ctx.mh.stream_id));
debug!(logger, "====> task service {:?}", &r);
let resp = s
.handler_message(r)
.await
.map_err(|err| ttrpc::Error::Others(format!("failed to handler message {:?}", err)))?;
debug!(logger, "<==== task service {:?}", &resp);
Ok(resp
.try_into()
.map_err(|err| ttrpc::Error::Others(format!("failed to translate to shim {:?}", err)))?)
}
macro_rules! impl_service {
($($name: tt | $req: ty | $resp: ty),*) => {
#[async_trait]
impl shim_async::Task for TaskService {
$(async fn $name(&self, ctx: &TtrpcContext, req: $req) -> ttrpc::Result<$resp> {
handler_message(&self.handler, ctx, req).await
})*
}
};
}
impl_service!(
state | api::StateRequest | api::StateResponse,
create | api::CreateTaskRequest | api::CreateTaskResponse,
start | api::StartRequest | api::StartResponse,
delete | api::DeleteRequest | api::DeleteResponse,
pids | api::PidsRequest | api::PidsResponse,
pause | api::PauseRequest | api::Empty,
resume | api::ResumeRequest | api::Empty,
kill | api::KillRequest | api::Empty,
exec | api::ExecProcessRequest | api::Empty,
resize_pty | api::ResizePtyRequest | api::Empty,
update | api::UpdateTaskRequest | api::Empty,
wait | api::WaitRequest | api::WaitResponse,
stats | api::StatsRequest | api::StatsResponse,
connect | api::ConnectRequest | api::ConnectResponse,
shutdown | api::ShutdownRequest | api::Empty
);