avatar

Rust 异步编程

AI Summary
博客介绍了 Rust 异步编程。解释了 trait 的概念,阐述了进程 — 线程 — 协程的异步模型。讲解了 async/await 的用法及 async lifetime。详细说明了 Future trait 及其 Poll 枚举类型。还涉及了异步调试工具如 tokio-tracing、tokio-console、await-tree 以及 openharmony 的 yinglong 框架及其特定需求。最后给出了一篇关于 reactor 模式的文章链接。

名次解释#

  • trait: 是一种定义共享行为的方式,它类似于其他编程语言中的接口(interface)或抽象类(abstract class)。trait 允许你定义一组方法,这些方法可以被任何类型的结构体、枚举或实现该 trait 的类型所使用

Rust 异步编程#

异步编程,或者叫异步,是一种被越来越多编程语言支持的并发编程模型。它能够在一小撮 OS 线程上运行一大堆并发任务,同时还能通过 async/await 语法,保持原本同步编程的观感。

在 Rust 中主要应用的是 进程—线程—协程 异步模型,如下所示:

异步编程

下层是进程,进程是持有资源的最小单位;中层是线程,线程不持有资源,是 CPU 调度的最小单位;上层是协程,协程既不持有资源、也不在意 CPU 的调度,它仅仅关注的是“协作式的、自然的流程切换”。

异步运行时就负责调度执行上述的协程对象。例如在一个协程在等待 IO 时,这个协程会主动出让自己的执行权给异步运行时,这时异步运行时可以调度运行其他的协程,从而最大化地利用 CPU 时间片。在 IO 密集型的应用中,异步编程将能够极大地提高执行效率

async/await 的使用#

async/await 是 Rust 中特殊的语法,它使得让出当前线程的控制权而不阻塞线程成为可能,从而允许在等待一个操作完成时可以运行其他代码。

简单代码

rust
use tokio::time::{sleep, Duration};
async fn foo() -> u8 {
sleep(Duration::from_secs(1)).await;
5
}
// 另一个异步函数,调用 `foo` 并等待其结果
async fn bar() -> u8 {
let result = foo().await;
result + 1
}
#[tokio::main]
async fn main() {
// 调用 `bar` 并等待其结果
let result = bar().await;
println!("Result: {}", result); // 输出: Result: 6
}

有两种主要的方式使用 async:async fn 和 async {}。这两中使用方式都会返回一个实现了 Future trait 的值:

rust
// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。
// `foo().await` 将会产生一个 u8 类型的值。
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。
async {
let x: u8 = foo().await;
x + 5
}
}

async fn 和 async {} 返回的 Future 是惰性的:在真正开始运行之前它什么也不会做。运行一个 Future 的最普遍的方式是 await 这个 Future: Future.await。 当 await 一个 Future 时,会暂停当前函数的运行,直到完成对 Future 的运行。如果这个 Future 被阻塞住了(例如等待网络 IO),它会让出当前线程的控制权。当 Future 中的阻塞操作就绪时( 例如等待的网络 IO 返回了响应),executor 会通过 poll 会恢复 Future 的运行。

async lifetime#

与普通的函数不一样,async fn 会获取引用或其他非静态生命周期的参数,然后返回被这些参数的生命周期约束的 Future

rust
async fn foo(x: &u8) -> u8 { *x }
// 这与上面的函数完全等价
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
  • <'a>:这是一个生命周期参数,表示引用 x 的生命周期。
  • x: &'a u8:函数参数 x 是一个生命周期为 'a 的 u8 类型的引用。
  • -> impl Future<Output = u8> + 'a:函数返回一个实现了 Future 特性的对象,该对象的输出类型为 u8,且具有生命周期 'a

这意味着,async fn 返回的 Future 必须在非静态生命周期参数仍然有效时 .await。在大多数情况下,我们在调用 async 函数后会立马 .await(例如 foo(&x).await),因此 async lifetime 不会对执行产生什么影响。 但是,如果我们存储这种 Future 或者发送给其他的 task 或者 thread,就可能会造成问题。

把带有引用参数的async fn 转化为静态 Future 的解决方法是:把参数和对 async fn 的调用封装到 async 块中:

rust
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}

Rust 异步和其他语言的区别#

  • Rust 中 Futures 是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping)一个 future 可以阻止它继续执行。
  • Rust 中的 异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。
  • Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
  • Rust 里 单线程的和多线程的 运行时都可用,而他们会有不同的优劣

Future trait#

rust
pub trait Future {
type Output; // Future计算完成时产生的值的类型
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Future 表示一个异步计算,或者说会在未来完成计算的操作。Future 的核心是 poll 方法,当调用 poll 方法时会尝试计算 Future 得到最终的值。 如果值还没有准备好(例如等待某些事件发生),则此方法不会阻塞,而是会直接返回一个结果表示 Future 还没有计算完毕

Poll#

当调用 Futurepoll 方法时会返回一个枚举类型的值:

  • Poll::Pending,表示这个 Future 还没计算完成
  • Poll::Ready(val),表示这个 Future 计算完毕,并附带计算结果:val

如果 Future 没有计算完成,例如想要等待一个 IO 事件发生,那么在 poll 方法体内,我们通常会调用传递给 poll 方法的 Contextwaker 方法拿到一个 Waker(通常把 Waker 叫做唤醒器),然后注册这个 Waker 到一个“事件通知系统”中,最后返回 Pending 表示 Future 没有计算完成。

在未来某一时刻,Future 等待的 IO 事件就绪了,那么“事件通知系统”就会利用我们注册的 Waker 通过某种唤醒机制唤醒这个 Future,通过 poll 继续计算执行该 Future

通过 Waker 唤醒器,我们可以只在 Future 想要等待的事件就绪时,才去唤醒 Future。这样我们就不需要通过一个死循环不断的调用 poll 方法来驱动 Future 的执行,这是异步编程之所以高效的关键所在。

rust
struct SocketRead<'a> {
socket: &'a Socket
}
impl<'a> Future for SocketRead<'a> {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {
let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);
match data {
Some(data) => Poll::Ready(data),
None => {
REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());
Poll::Pending
}
}
}
}

代码中的 REACTOR 就是前文中所提到过的“事件通知系统”。当 socket 中有数据可读时,REACTOR就会使用注册的 Waker 唤醒负责 SocketRead ,然后调用 poll 方法再次计算该 Future

Rust 异步调试#

编译结果#

编译结果

rust 异步实现是一个无栈协程实现,所有的执行都是在工作线程执行的

  • Future执行使用工作线程的栈,无独立栈空间
  • Poll函数执行完毕,栈将被回收

无栈协程

执行的时候 工作线程会开栈,保存自己的寄存器和一些上下文的信息,获取异步任务执行的时候,调用 poll 函数,给 poll 函数创建一个栈,保存 poll函数里面的 变量和一些上下文信息。当 poll函数执行完毕, 无论返回的状态是 pending 还是 ready 栈将被回收 回到之前的执行逻辑上面去。这样就会有严重的缺点, poll 方法执行之后 栈就被回收了,只有在异步执行的过程中,用户才能观察到异步任务的状态。当异步任务挂起的时候,用户无法观察到异步任务的状态,这样就会导致调试困难。

定位问题

调测工具#

tokio-tracing#

tokio-tracing 提供了一种结构化的日志记录方式,可以捕获和记录异步任务的上下文信息

rust
use tracing::{info, instrument};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[instrument]
fn foo(ans: i32) {
info!("in foo");
}
fn main() {
tracing_subscriber::registry().with(fmt::layer()).init();
foo(42);
}

运行结果:

shell
2022-04-10T02:44:12.885556Z INFO foo{ans=42}: test_tracing: in foo

#[instrument] 宏会自动为函数生成跟踪信息,包括函数名和参数。info! 宏用于记录日志信息。

tokio-console#

rust
use std::{sync::Arc, time::Duration};
use tokio::{sync::Semaphore, task, time::sleep};
#[tokio::main]
async fn main() {
// 注意. 初始化tracing收集
console_subscriber::init();
// 线程1的令牌桶1初始一个令牌,可以先打印1
let semaphore = Arc::new(Semaphore::new(1));
let cnt = 3;
let semaphore2 = semaphore.clone();
// 线程2的令牌桶2初始没有令牌,直到1打印后增加令牌
let semaphore_wait = Arc::new(Semaphore::new(0));
let semaphore_wait2 = semaphore_wait.clone();
// 注意. 使用task::Builder来增加task名字,否则等同tokio::spawn
let t1 = task::Builder::default()
.name("t1")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore.acquire().await.unwrap();
print!("1 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶1
permit.forget();
// 令牌桶2增加令牌,可以打印2
semaphore_wait2.add_permits(1);
}
})
.unwrap();
let t2 = task::Builder::default()
.name("t2")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore_wait.acquire().await.unwrap();
print!("2 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶2
permit.forget();
// 令牌桶1增加令牌,可以打印1
semaphore2.add_permits(1);
}
})
.unwrap();
tokio::try_join!(t1, t2).unwrap();
}

result

await-tree#

rust
use std::time::Duration;
use await_tree::{Config, InstrumentAwait, Registry};
use futures::future::{join, pending};
use tokio::time::sleep;
async fn bar(i: i32) {
// `&'static str` span
baz(i).instrument_await("baz in bar").await
}
async fn baz(i: i32) {
// runtime `String` span is also supported
pending()
.instrument_await(format!("pending in baz {i}"))
.await
}
async fn foo() {
// spans of joined futures will be siblings in the tree
join(
bar(3).instrument_await("bar"),
baz(2).instrument_await("baz"),
)
.await;
}
#[tokio::main]
async fn main() {
let registry = Registry::new(Config::default());
let root = registry.register((), "foo");
tokio::spawn(root.instrument(foo()));
sleep(Duration::from_secs(1)).await;
let tree = registry.get(()).unwrap().to_string();
println!("{tree}");
}
bash
foo [1.003s]
bar [1.003s]
baz in bar [1.003s]
pending in baz 3 [1.003s]
baz [1.003s]
pending in baz 2 [1.003s]

openharmony#

诉求#

  • 可以检测任务阻塞和执行时间过长
  • 支持黑匣打印
  • 支持性能调优
  • 可以推出完整的异步栈
  • 运行态避免额外性能 & 内存开销
  • 易用性,避免大范围的侵入式修改

yinglong 框架#

yinglong

  • pending 状态 yinglong
  • 组合 yinglong
  • 任务栈 yinglong
  • 组合信息 yinglong

其他#

reactor