Rust 异步编程
名次解释
- trait: 是一种定义共享行为的方式,它类似于其他编程语言中的接口(interface)或抽象类(abstract class)。trait 允许你定义一组方法,这些方法可以被任何类型的结构体、枚举或实现该 trait 的类型所使用
Rust 异步编程
异步编程,或者叫异步,是一种被越来越多编程语言支持的并发编程模型。它能够在一小撮 OS 线程上运行一大堆并发任务,同时还能通过 async/await 语法,保持原本同步编程的观感。
在 Rust 中主要应用的是 进程—线程—协程 异步模型,如下所示:
下层是进程,进程是持有资源的最小单位;中层是线程,线程不持有资源,是 CPU 调度的最小单位;上层是协程,协程既不持有资源、也不在意 CPU 的调度,它仅仅关注的是“协作式的、自然的流程切换”。
异步运行时就负责调度执行上述的协程对象。例如在一个协程在等待 IO 时,这个协程会主动出让自己的执行权给异步运行时,这时异步运行时可以调度运行其他的协程,从而最大化地利用 CPU 时间片。在 IO 密集型的应用中,异步编程将能够极大地提高执行效率
async/await 的使用
async/await 是 Rust 中特殊的语法,它使得让出当前线程的控制权而不阻塞线程成为可能,从而允许在等待一个操作完成时可以运行其他代码。
简单代码
use tokio::pubDate::{sleep, Duration};
async fn foo() -> u8 {
sleep(Duration::from_secs(1)).await;
5
}
// 另一个异步函数,调用 `foo` 并等待其结果
async fn bar() -> u8 {
let result = foo().await;
result + 1
}
#[tokio::main]
async fn main() {
// 调用 `bar` 并等待其结果
let result = bar().await;
println!("Result: {}", result); // 输出: Result: 6
}
有两种主要的方式使用 async:async fn 和 async {} 。这两中使用方式都会返回一个实现了 Future trait 的值:
// `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。
// `foo().await` 将会产生一个 u8 类型的值。
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。
async {
let x: u8 = foo().await;
x + 5
}
}
async fn 和 async {} 返回的 Future 是惰性的:在真正开始运行之前它什么也不会做。运行一个 Future 的最普遍的方式是 await 这个 Future: Future.await 。
当 await 一个 Future 时,会暂停当前函数的运行,直到完成对 Future 的运行。如果这个 Future 被阻塞住了(例如等待网络 IO),它会让出当前线程的控制权。当 Future 中的阻塞操作就绪时(
例如等待的网络 IO 返回了响应),executor 会通过 poll 会恢复 Future 的运行。
async lifetime
与普通的函数不一样, async fn 会获取引用或其他非静态生命周期的参数,然后返回被这些参数的生命周期约束的 Future :
async fn foo(x: &u8) -> u8 { *x }
// 这与上面的函数完全等价
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
-
<'a>:这是一个生命周期参数,表示引用 x 的生命周期。 - x:
&'a u8:函数参数 x 是一个生命周期为 ‘a 的 u8 类型的引用。 -
-> impl Future<Output = u8> + 'a:函数返回一个实现了Future特性的对象,该对象的输出类型为u8,且具有生命周期'a。
这意味着, async fn 返回的 Future 必须在非静态生命周期参数仍然有效时 .await 。在大多数情况下,我们在调用 async 函数后会立马 .await(例如 foo(&x).await) ,因此 async lifetime 不会对执行产生什么影响。
但是,如果我们存储这种 Futur e 或者发送给其他的 task 或者 thread ,就可能会造成问题。
把带有引用参数的 async fn 转化为静态 Future 的解决方法是:把参数和对 async fn 的调用封装到 async 块中:
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
Rust 异步和其他语言的区别
- Rust 中
Futures是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping)一个 future 可以阻止它继续执行。 - Rust 中的 异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。
- Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
- Rust 里 单线程的和多线程的 运行时都可用,而他们会有不同的优劣
Future trait
pub trait Future {
type Output; // Future计算完成时产生的值的类型
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Future 表示一个异步计算,或者说会在未来完成计算的操作。 Future 的核心是 poll 方法,当调用 poll 方法时会尝试计算 Future 得到最终的值。
如果值还没有准备好(例如等待某些事件发生),则此方法不会阻塞,而是会直接返回一个结果表示 Future 还没有计算完毕
Poll
当调用 Future 的 poll 方法时会返回一个枚举类型的值:
-
Poll::Pending,表示这个Future还没计算完成 -
Poll::Ready(val),表示这个Future计算完毕,并附带计算结果:val
如果 Future 没有计算完成,例如想要等待一个 IO 事件发生,那么在 poll 方法体内,我们通常会调用传递给 poll 方法的 Context 的 waker 方法拿到一个 Wake r(通常把 Waker 叫做唤醒器),然后注册这个 Waker 到一个“事件通知系统”中,最后返回 Pending 表示 Future 没有计算完成。
在未来某一时刻, Future 等待的 IO 事件就绪了,那么“事件通知系统”就会利用我们注册的 Waker 通过某种唤醒机制唤醒这个 Future ,通过 poll 继续计算执行该 Future 。
通过 Waker 唤醒器,我们可以只在 Future 想要等待的事件就绪时,才去唤醒 Future 。这样我们就不需要通过一个死循环不断的调用 poll 方法来驱动 Future 的执行,这是异步编程之所以高效的关键所在。
struct SocketRead<'a> {
socket: &'a Socket
}
impl<'a> Future for SocketRead<'a> {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {
let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);
match data {
Some(data) => Poll::Ready(data),
None => {
REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());
Poll::Pending
}
}
}
}
代码中的 REACTOR 就是前文中所提到过的“事件通知系统”。当 socket 中有数据可读时, REACTOR 就会使用注册的 Wake r 唤醒负责 SocketRead ,然后调用 poll 方法再次计算该 Future 。
Rust 异步调试
编译结果
rust 异步实现是一个无栈协程实现,所有的执行都是在工作线程执行的
-
Future执行使用工作线程的栈,无独立栈空间 -
Poll函数执行完毕,栈将被回收
执行的时候 工作线程会开栈,保存自己的寄存器和一些上下文的信息,获取异步任务执行的时候,调用 poll 函数,给 poll 函数创建一个栈,保存 poll 函数里面的
变量和一些上下文信息。当 poll 函数执行完毕, 无论返回的状态是 pending 还是 ready 栈将被回收 回到之前的执行逻辑上面去。这样就会有严重的缺点, poll 方法执行之后
栈就被回收了,只有在异步执行的过程中,用户才能观察到异步任务的状态。当异步任务挂起的时候,用户无法观察到异步任务的状态,这样就会导致调试困难。
调测工具
tokio-tracing
tokio-tracing 提供了一种结构化的日志记录方式,可以捕获和记录异步任务的上下文信息
use tracing::{info, instrument};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[instrument]
fn foo(ans: i32) {
info!("in foo");
}
fn main() {
tracing_subscriber::registry().with(fmt::layer()).init();
foo(42);
}
运行结果:
2022-04-10T02:44:12.885556Z INFO foo{ans=42}: test_tracing: in foo
#[instrument] 宏会自动为函数生成跟踪信息,包括函数名和参数。 info! 宏用于记录日志信息。
tokio-console
use std::{sync::Arc, pubDate::Duration};
use tokio::{sync::Semaphore, task, pubDate::sleep};
#[tokio::main]
async fn main() {
// 注意. 初始化tracing收集
console_subscriber::init();
// 线程1的令牌桶1初始一个令牌,可以先打印1
let semaphore = Arc::new(Semaphore::new(1));
let cnt = 3;
let semaphore2 = semaphore.clone();
// 线程2的令牌桶2初始没有令牌,直到1打印后增加令牌
let semaphore_wait = Arc::new(Semaphore::new(0));
let semaphore_wait2 = semaphore_wait.clone();
// 注意. 使用task::Builder来增加task名字,否则等同tokio::spawn
let t1 = task::Builder::default()
.name("t1")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore.acquire().await.unwrap();
print!("1 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶1
permit.forget();
// 令牌桶2增加令牌,可以打印2
semaphore_wait2.add_permits(1);
}
})
.unwrap();
let t2 = task::Builder::default()
.name("t2")
.spawn(async move {
for i in 0..cnt {
let permit = semaphore_wait.acquire().await.unwrap();
print!("2 ");
// 注意. 增加等待时间,便于观测
sleep(Duration::from_secs(i)).await;
// 消耗令牌,不放回令牌桶2
permit.forget();
// 令牌桶1增加令牌,可以打印1
semaphore2.add_permits(1);
}
})
.unwrap();
tokio::try_join!(t1, t2).unwrap();
}
await-tree
use std::pubDate::Duration;
use await_tree::{Config, InstrumentAwait, Registry};
use futures::future::{join, pending};
use tokio::pubDate::sleep;
async fn bar(i: i32) {
// `&'static str` span
baz(i).instrument_await("baz in bar").await
}
async fn baz(i: i32) {
// runtime `String` span is also supported
pending()
.instrument_await(format!("pending in baz {i}"))
.await
}
async fn foo() {
// spans of joined futures will be siblings in the tree
join(
bar(3).instrument_await("bar"),
baz(2).instrument_await("baz"),
)
.await;
}
#[tokio::main]
async fn main() {
let registry = Registry::new(Config::default());
let root = registry.register((), "foo");
tokio::spawn(root.instrument(foo()));
sleep(Duration::from_secs(1)).await;
let tree = registry.get(()).unwrap().to_string();
println!("{tree}");
}
foo [1.003s]
bar [1.003s]
baz in bar [1.003s]
pending in baz 3 [1.003s]
baz [1.003s]
pending in baz 2 [1.003s]
openharmony
诉求
- 可以检测任务阻塞和执行时间过长
- 支持黑匣打印
- 支持性能调优
- 可以推出完整的异步栈
- 运行态避免额外性能 & 内存开销
- 易用性,避免大范围的侵入式修改
yinglong 框架
- pending 状态
- 组合
- 任务栈
- 组合信息
其他
This work is licensed under CC BY-NC-SA 4.0