名词解释
- trait:Rust 中定义共享行为的方式,接近其他语言里的接口或抽象类。trait 可以定义一组方法,结构体、枚举或其他类型实现这个 trait 后,就能提供对应行为。
Rust 异步编程
异步编程是一种常见的并发模型。它可以在少量 OS 线程上运行大量并发任务,同时通过 async/await 保持接近同步代码的写法。
在 Rust 中,可以从进程、线程和协程三个层次理解异步模型:
下层是进程,进程是持有资源的最小单位;中层是线程,线程是 CPU 调度的最小单位;上层是协程,协程关注协作式切换,把等待中的任务让出去,让运行时调度其他任务。
异步运行时负责调度这些协程对象。当某个协程等待 IO 时,它会把执行权交还给运行时,运行时再调度其他可运行的协程。对于 IO 密集型应用,这种方式能更充分地利用线程资源。
async/await 的使用
async/await 是 Rust 写异步代码的核心语法。它允许任务在等待某个操作完成时暂停自身,把线程让给其他任务。
一个简单例子:
use tokio::pubDate::{sleep, Duration};
async fn foo() -> u8 {
sleep(Duration::from_secs(1)).await;
5
}
// 另一个异步函数,调用 `foo` 并等待其结果
async fn bar() -> u8 {
let result = foo().await;
result + 1
}
#[tokio::main]
async fn main() {
// 调用 `bar` 并等待其结果
let result = bar().await;
println!("Result: {}", result); // 输出: Result: 6
}
Rust 中常见的 async 写法有两种: async fn 和 async {} 。两种写法都会返回一个实现了 Future trait 的值:
// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。
// `foo().await` 将会产生一个 u8 类型的值。
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。
async {
let x: u8 = foo().await;
x + 5
}
}
async fn 和 async {} 返回的 Future 都是惰性的。在被轮询之前,它们不会主动运行。最常见的驱动方式是对 Future 调用 .await 。
当代码 .await 一个 Future 时,当前异步函数会暂停执行。如果这个 Future 正在等待网络 IO,它会把线程让出来。等 IO 就绪后,executor 会通过 poll 恢复这个 Future 。
async lifetime
async fn 和普通函数的一个差异在生命周期上。它可以接收引用或其他非静态生命周期参数,然后返回一个受这些参数生命周期约束的 Future :
async fn foo(x: &u8) -> u8 { *x }
// 这与上面的函数完全等价
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
-
<'a>:生命周期参数,表示引用x的生命周期。 -
x: &'a u8:参数x是一个生命周期为'a的u8引用。 -
-> impl Future<Output = u8> + 'a:函数返回一个实现了Future的对象,输出类型是u8,生命周期受'a约束。
这意味着 async fn 返回的 Future 必须在相关引用仍然有效时 .await 。大多数情况下,我们会在调用后立即 .await ,比如 foo(&x).await ,这时生命周期问题很少暴露。
如果要把这种 Future 存起来,或者发送给其他 task / thread ,生命周期就会变得重要。常见做法是把参数和 async fn 调用一起封装进 async 块:
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
Rust 异步和其他语言的区别
- Rust 中的
Future是惰性的,只有被轮询时才会继续执行。丢弃一个Future可以阻止它继续运行。 - Rust 异步模型强调零成本抽象。使用异步时可以避免不必要的堆分配和动态分发,这对性能和嵌入式场景都有帮助。
- Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
- Rust 同时支持单线程和多线程运行时,选择取决于任务类型和部署环境。
Future trait
pub trait Future {
type Output; // Future计算完成时产生的值的类型
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Future 表示一个异步计算,也就是未来某个时刻会完成的操作。它的核心方法是 poll 。调用 poll 时,运行时会尝试推进这个 Future ,并判断它是否已经产出结果。
如果结果还没有准备好,比如正在等待某个事件, poll 不会阻塞线程,而是返回一个状态,表示这个 Future 还需要继续等待。
Poll
当调用 Future 的 poll 方法时会返回一个枚举类型的值:
-
Poll::Pending:这个Future还没完成。 -
Poll::Ready(val):这个Future已经完成,并返回结果val。
如果 Future 还在等待 IO 事件, poll 方法通常会从 Context 中取出 Waker ,把它注册到事件通知系统里,然后返回 Pending 。
等 IO 事件就绪后,事件通知系统会使用之前注册的 Waker 唤醒这个 Future ,executor 再次调用 poll 推进执行。
Waker 让 executor 只在事件就绪时唤醒任务,避免用死循环反复调用 poll 。这也是异步运行时能高效处理大量 IO 任务的关键。
struct SocketRead<'a> {
socket: &'a Socket
}
impl<'a> Future for SocketRead<'a> {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {
let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);
match data {
Some(data) => Poll::Ready(data),
None => {
REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());
Poll::Pending
}
}
}
}
代码中的 REACTOR 就是前面提到的事件通知系统。当 socket 有数据可读时, REACTOR 会使用注册的 Waker 唤醒 SocketRead ,然后由 executor 再次调用 poll 。
Rust 异步调试
编译结果
Rust 异步是无栈协程实现,所有执行都发生在工作线程上:
-
Future执行时使用工作线程的栈,没有独立栈空间。 -
poll函数执行完毕后,本次调用栈会被回收。
执行异步任务时,工作线程会调用 poll ,并为这次调用建立栈帧,保存 poll 内部变量和上下文信息。 poll 返回后,无论状态是 Pending 还是 Ready ,这段调用栈都会被回收。
这会带来调试难点:任务正在执行时可以观察调用栈,任务挂起后,工作线程栈已经回到运行时逻辑,普通调试器很难看到完整的异步任务状态。
调测工具
tokio-tracing
tokio-tracing 提供结构化日志能力,可以记录异步任务的上下文信息。
use tracing::{info, instrument};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[instrument]
fn foo(ans: i32) {
info!("in foo");
}
fn main() {
tracing_subscriber::registry().with(fmt::layer()).init();
foo(42);
}
运行结果:
2022-04-10T02:44:12.885556Z INFO foo{ans=42}: test_tracing: in foo
#[instrument] 宏会自动生成函数跟踪信息,包括函数名和参数。 info! 宏用于写入日志。
tokio-console
use std::{sync::Arc, pubDate::Duration};
use tokio::{sync::Semaphore, task, pubDate::sleep};
#[tokio::main]
async fn main() {
// 注意. 初始化tracing收集
console_subscriber::init();
// 线程1的令牌桶1初始一个令牌,可以先打印1
let semaphore = Arc::new(Semaphore::new(1));
let cnt = 3;
let semaphore2 = semaphore.clone();
// 线程2的令牌桶2初始没有令牌,直到1打印后增加令牌
let semaphore_wait = Arc::new(Semaphore::new(0));
let semaphore_wait2 = semaphore_wait.clone();
// 注意. 使用task::Builder来增加task名字,否则等同tokio::spawn
let t1 = task::Builder::default()
.name("t1")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore.acquire().await.unwrap();
print!("1 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶1
permit.forget();
// 令牌桶2增加令牌,可以打印2
semaphore_wait2.add_permits(1);
}
})
.unwrap();
let t2 = task::Builder::default()
.name("t2")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore_wait.acquire().await.unwrap();
print!("2 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶2
permit.forget();
// 令牌桶1增加令牌,可以打印1
semaphore2.add_permits(1);
}
})
.unwrap();
tokio::try_join!(t1, t2).unwrap();
}
await-tree
use std::pubDate::Duration;
use await_tree::{Config, InstrumentAwait, Registry};
use futures::future::{join, pending};
use tokio::pubDate::sleep;
async fn bar(i: i32) {
// `&'static str` span
baz(i).instrument_await("baz in bar").await
}
async fn baz(i: i32) {
// runtime `String` span is also supported
pending()
.instrument_await(format!("pending in baz {i}"))
.await
}
async fn foo() {
// spans of joined futures will be siblings in the tree
join(
bar(3).instrument_await("bar"),
baz(2).instrument_await("baz"),
)
.await;
}
#[tokio::main]
async fn main() {
let registry = Registry::new(Config::default());
let root = registry.register((), "foo");
tokio::spawn(root.instrument(foo()));
sleep(Duration::from_secs(1)).await;
let tree = registry.get(()).unwrap().to_string();
println!("{tree}");
}
foo [1.003s]
bar [1.003s]
baz in bar [1.003s]
pending in baz 3 [1.003s]
baz [1.003s]
pending in baz 2 [1.003s]
openharmony
诉求
- 可以检测任务阻塞和执行时间过长
- 支持黑匣打印
- 支持性能调优
- 可以推出完整的异步栈
- 运行态避免额外性能 & 内存开销
- 易用性,避免大范围的侵入式修改
yinglong 框架
- pending 状态
- 组合
- 任务栈
- 组合信息