Rust 异步编程
名次解释#
- trait: 是一种定义共享行为的方式,它类似于其他编程语言中的接口(interface)或抽象类(abstract class)。trait 允许你定义一组方法,这些方法可以被任何类型的结构体、枚举或实现该 trait 的类型所使用
Rust 异步编程#
异步编程,或者叫异步,是一种被越来越多编程语言支持的并发编程模型。它能够在一小撮 OS 线程上运行一大堆并发任务,同时还能通过 async/await
语法,保持原本同步编程的观感。
在 Rust 中主要应用的是 进程—线程—协程 异步模型,如下所示:
下层是进程,进程是持有资源的最小单位;中层是线程,线程不持有资源,是 CPU 调度的最小单位;上层是协程,协程既不持有资源、也不在意 CPU 的调度,它仅仅关注的是“协作式的、自然的流程切换”。
异步运行时就负责调度执行上述的协程对象。例如在一个协程在等待 IO 时,这个协程会主动出让自己的执行权给异步运行时,这时异步运行时可以调度运行其他的协程,从而最大化地利用 CPU 时间片。在 IO 密集型的应用中,异步编程将能够极大地提高执行效率
async/await 的使用#
async/await
是 Rust 中特殊的语法,它使得让出当前线程的控制权而不阻塞线程成为可能,从而允许在等待一个操作完成时可以运行其他代码。
简单代码
rust
use tokio::time::{sleep, Duration};async fn foo() -> u8 {sleep(Duration::from_secs(1)).await;5}// 另一个异步函数,调用 `foo` 并等待其结果async fn bar() -> u8 {let result = foo().await;result + 1}#[tokio::main]async fn main() {// 调用 `bar` 并等待其结果let result = bar().await;println!("Result: {}", result); // 输出: Result: 6}
有两种主要的方式使用 async:async fn 和 async {}
。这两中使用方式都会返回一个实现了 Future trait
的值:
rust
// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。// `foo().await` 将会产生一个 u8 类型的值。async fn foo() -> u8 { 5 }fn bar() -> impl Future<Output = u8> {// 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。async {let x: u8 = foo().await;x + 5}}
async fn 和 async {}
返回的 Future
是惰性的:在真正开始运行之前它什么也不会做。运行一个 Future
的最普遍的方式是 await
这个 Future: Future.await
。
当 await
一个 Future
时,会暂停当前函数的运行,直到完成对 Future 的运行。如果这个 Future 被阻塞住了(例如等待网络 IO),它会让出当前线程的控制权。当 Future 中的阻塞操作就绪时(
例如等待的网络 IO 返回了响应),executor 会通过 poll
会恢复 Future
的运行。
async lifetime#
与普通的函数不一样,async fn
会获取引用或其他非静态生命周期的参数,然后返回被这些参数的生命周期约束的 Future
:
rust
async fn foo(x: &u8) -> u8 { *x }// 这与上面的函数完全等价fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {async move { *x }}
<'a>
:这是一个生命周期参数,表示引用 x 的生命周期。- x:
&'a u8
:函数参数 x 是一个生命周期为 'a 的 u8 类型的引用。 -> impl Future<Output = u8> + 'a:
函数返回一个实现了Future
特性的对象,该对象的输出类型为u8
,且具有生命周期'a
。
这意味着,async fn
返回的 Future
必须在非静态生命周期参数仍然有效时 .await
。在大多数情况下,我们在调用 async
函数后会立马 .await(例如 foo(&x).await)
,因此 async lifetime
不会对执行产生什么影响。
但是,如果我们存储这种 Futur
e 或者发送给其他的 task
或者 thread
,就可能会造成问题。
把带有引用参数的async fn
转化为静态 Future
的解决方法是:把参数和对 async fn
的调用封装到 async
块中:
rust
fn bad() -> impl Future<Output = u8> {let x = 5;borrow_x(&x) // ERROR: `x` does not live long enough}fn good() -> impl Future<Output = u8> {async {let x = 5;borrow_x(&x).await}}
Rust 异步和其他语言的区别#
- Rust 中
Futures
是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping)一个 future 可以阻止它继续执行。 - Rust 中的 异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。
- Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
- Rust 里 单线程的和多线程的 运行时都可用,而他们会有不同的优劣
Future trait#
rust
pub trait Future {type Output; // Future计算完成时产生的值的类型fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;}
Future
表示一个异步计算,或者说会在未来完成计算的操作。Future
的核心是 poll
方法,当调用 poll
方法时会尝试计算 Future
得到最终的值。
如果值还没有准备好(例如等待某些事件发生),则此方法不会阻塞,而是会直接返回一个结果表示 Future 还没有计算完毕
Poll#
当调用 Future
的 poll
方法时会返回一个枚举类型的值:
Poll::Pending
,表示这个Future
还没计算完成Poll::Ready(val)
,表示这个Future
计算完毕,并附带计算结果:val
如果 Future
没有计算完成,例如想要等待一个 IO
事件发生,那么在 poll
方法体内,我们通常会调用传递给 poll
方法的 Context
的 waker
方法拿到一个 Wake
r(通常把 Waker 叫做唤醒器),然后注册这个 Waker
到一个“事件通知系统”中,最后返回 Pending
表示 Future
没有计算完成。
在未来某一时刻,Future
等待的 IO
事件就绪了,那么“事件通知系统”就会利用我们注册的 Waker
通过某种唤醒机制唤醒这个 Future
,通过 poll
继续计算执行该 Future
。
通过 Waker
唤醒器,我们可以只在 Future
想要等待的事件就绪时,才去唤醒 Future
。这样我们就不需要通过一个死循环不断的调用 poll
方法来驱动 Future
的执行,这是异步编程之所以高效的关键所在。
rust
struct SocketRead<'a> {socket: &'a Socket}impl<'a> Future for SocketRead<'a> {type Output = Vec<u8>;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);match data {Some(data) => Poll::Ready(data),None => {REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());Poll::Pending}}}}
代码中的 REACTOR
就是前文中所提到过的“事件通知系统”。当 socket
中有数据可读时,REACTOR
就会使用注册的 Wake
r 唤醒负责 SocketRead
,然后调用 poll
方法再次计算该 Future
。
Rust 异步调试#
编译结果#
rust 异步实现是一个无栈协程实现,所有的执行都是在工作线程执行的
Future
执行使用工作线程的栈,无独立栈空间Poll
函数执行完毕,栈将被回收
执行的时候 工作线程会开栈,保存自己的寄存器和一些上下文的信息,获取异步任务执行的时候,调用 poll 函数,给 poll 函数创建一个栈,保存 poll
函数里面的
变量和一些上下文信息。当 poll
函数执行完毕, 无论返回的状态是 pending
还是 ready
栈将被回收 回到之前的执行逻辑上面去。这样就会有严重的缺点, poll 方法执行之后
栈就被回收了,只有在异步执行的过程中,用户才能观察到异步任务的状态。当异步任务挂起的时候,用户无法观察到异步任务的状态,这样就会导致调试困难。
调测工具#
tokio-tracing#
tokio-tracing 提供了一种结构化的日志记录方式,可以捕获和记录异步任务的上下文信息
rust
use tracing::{info, instrument};use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};#[instrument]fn foo(ans: i32) {info!("in foo");}fn main() {tracing_subscriber::registry().with(fmt::layer()).init();foo(42);}
运行结果:
shell
2022-04-10T02:44:12.885556Z INFO foo{ans=42}: test_tracing: in foo
#[instrument]
宏会自动为函数生成跟踪信息,包括函数名和参数。info!
宏用于记录日志信息。
tokio-console#
rust
use std::{sync::Arc, time::Duration};use tokio::{sync::Semaphore, task, time::sleep};#[tokio::main]async fn main() {// 注意. 初始化tracing收集console_subscriber::init();// 线程1的令牌桶1初始一个令牌,可以先打印1let semaphore = Arc::new(Semaphore::new(1));let cnt = 3;let semaphore2 = semaphore.clone();// 线程2的令牌桶2初始没有令牌,直到1打印后增加令牌let semaphore_wait = Arc::new(Semaphore::new(0));let semaphore_wait2 = semaphore_wait.clone();// 注意. 使用task::Builder来增加task名字,否则等同tokio::spawnlet t1 = task::Builder::default().name("t1").spawn(async move {for i in 0..cnt {let permit = semaphore.acquire().await.unwrap();print!("1 ");// 注意. 增加等待时间,便于观测sleep(Duration::from_secs(i)).await;// 消耗令牌,不放回令牌桶1permit.forget();// 令牌桶2增加令牌,可以打印2semaphore_wait2.add_permits(1);}}).unwrap();let t2 = task::Builder::default().name("t2").spawn(async move {for i in 0..cnt {let permit = semaphore_wait.acquire().await.unwrap();print!("2 ");// 注意. 增加等待时间,便于观测sleep(Duration::from_secs(i)).await;// 消耗令牌,不放回令牌桶2permit.forget();// 令牌桶1增加令牌,可以打印1semaphore2.add_permits(1);}}).unwrap();tokio::try_join!(t1, t2).unwrap();}
await-tree#
rust
use std::time::Duration;use await_tree::{Config, InstrumentAwait, Registry};use futures::future::{join, pending};use tokio::time::sleep;async fn bar(i: i32) {// `&'static str` spanbaz(i).instrument_await("baz in bar").await}async fn baz(i: i32) {// runtime `String` span is also supportedpending().instrument_await(format!("pending in baz {i}")).await}async fn foo() {// spans of joined futures will be siblings in the treejoin(bar(3).instrument_await("bar"),baz(2).instrument_await("baz"),).await;}#[tokio::main]async fn main() {let registry = Registry::new(Config::default());let root = registry.register((), "foo");tokio::spawn(root.instrument(foo()));sleep(Duration::from_secs(1)).await;let tree = registry.get(()).unwrap().to_string();println!("{tree}");}
bash
foo [1.003s]bar [1.003s]baz in bar [1.003s]pending in baz 3 [1.003s]baz [1.003s]pending in baz 2 [1.003s]
openharmony#
诉求#
- 可以检测任务阻塞和执行时间过长
- 支持黑匣打印
- 支持性能调优
- 可以推出完整的异步栈
- 运行态避免额外性能 & 内存开销
- 易用性,避免大范围的侵入式修改
yinglong 框架#
- pending 状态
- 组合
- 任务栈
- 组合信息