名词解释

  • trait:Rust 中定义共享行为的方式,接近其他语言里的接口或抽象类。trait 可以定义一组方法,结构体、枚举或其他类型实现这个 trait 后,就能提供对应行为。

Rust 异步编程

异步编程是一种常见的并发模型。它可以在少量 OS 线程上运行大量并发任务,同时通过 async/await 保持接近同步代码的写法。

在 Rust 中,可以从进程、线程和协程三个层次理解异步模型:

异步编程

下层是进程,进程是持有资源的最小单位;中层是线程,线程是 CPU 调度的最小单位;上层是协程,协程关注协作式切换,把等待中的任务让出去,让运行时调度其他任务。

异步运行时负责调度这些协程对象。当某个协程等待 IO 时,它会把执行权交还给运行时,运行时再调度其他可运行的协程。对于 IO 密集型应用,这种方式能更充分地利用线程资源。

async/await 的使用

async/await 是 Rust 写异步代码的核心语法。它允许任务在等待某个操作完成时暂停自身,把线程让给其他任务。

一个简单例子:

 use tokio::pubDate::{sleep, Duration};
async fn foo() -> u8 {
    sleep(Duration::from_secs(1)).await;
    5
}
// 另一个异步函数,调用 `foo` 并等待其结果
async fn bar() -> u8 {
    let result = foo().await;
    result + 1
}

#[tokio::main]
async fn main() {
    // 调用 `bar` 并等待其结果
    let result = bar().await;
    println!("Result: {}", result); // 输出: Result: 6
} 

Rust 中常见的 async 写法有两种: async fn async {} 。两种写法都会返回一个实现了 Future trait 的值:

 // `foo()` 返回一个实现了 `Future<Output = u8>` 的类型。
// `foo().await` 将会产生一个 u8 类型的值。
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // 这个 `async` 块会产生一个实现了 `Future<Output = u8>` 的类型。
    async {
        let x: u8 = foo().await;
        x + 5
    }
}
 

async fn async {} 返回的 Future 都是惰性的。在被轮询之前,它们不会主动运行。最常见的驱动方式是对 Future 调用 .await

当代码 .await 一个 Future 时,当前异步函数会暂停执行。如果这个 Future 正在等待网络 IO,它会把线程让出来。等 IO 就绪后,executor 会通过 poll 恢复这个 Future

async lifetime

async fn 和普通函数的一个差异在生命周期上。它可以接收引用或其他非静态生命周期参数,然后返回一个受这些参数生命周期约束的 Future

 async fn foo(x: &u8) -> u8 { *x }

// 这与上面的函数完全等价
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
} 
  • <'a> :生命周期参数,表示引用 x 的生命周期。
  • x: &'a u8 :参数 x 是一个生命周期为 'a u8 引用。
  • -> impl Future<Output = u8> + 'a :函数返回一个实现了 Future 的对象,输出类型是 u8 ,生命周期受 'a 约束。

这意味着 async fn 返回的 Future 必须在相关引用仍然有效时 .await 。大多数情况下,我们会在调用后立即 .await ,比如 foo(&x).await ,这时生命周期问题很少暴露。

如果要把这种 Future 存起来,或者发送给其他 task / thread ,生命周期就会变得重要。常见做法是把参数和 async fn 调用一起封装进 async 块:

 fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
} 

Rust 异步和其他语言的区别

  • Rust 中的 Future 是惰性的,只有被轮询时才会继续执行。丢弃一个 Future 可以阻止它继续运行。
  • Rust 异步模型强调零成本抽象。使用异步时可以避免不必要的堆分配和动态分发,这对性能和嵌入式场景都有帮助。
  • Rust 不提供内置运行时。相反,运行时由社区维护的库提供。
  • Rust 同时支持单线程和多线程运行时,选择取决于任务类型和部署环境。

Future trait

 pub trait Future {
    type Output;	// Future计算完成时产生的值的类型
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
} 

Future 表示一个异步计算,也就是未来某个时刻会完成的操作。它的核心方法是 poll 。调用 poll 时,运行时会尝试推进这个 Future ,并判断它是否已经产出结果。

如果结果还没有准备好,比如正在等待某个事件, poll 不会阻塞线程,而是返回一个状态,表示这个 Future 还需要继续等待。

Poll

当调用 Future poll 方法时会返回一个枚举类型的值:

  • Poll::Pending :这个 Future 还没完成。
  • Poll::Ready(val) :这个 Future 已经完成,并返回结果 val

如果 Future 还在等待 IO 事件, poll 方法通常会从 Context 中取出 Waker ,把它注册到事件通知系统里,然后返回 Pending

等 IO 事件就绪后,事件通知系统会使用之前注册的 Waker 唤醒这个 Future ,executor 再次调用 poll 推进执行。

Waker 让 executor 只在事件就绪时唤醒任务,避免用死循环反复调用 poll 。这也是异步运行时能高效处理大量 IO 任务的关键。

 struct SocketRead<'a> {
    socket: &'a Socket
}

impl<'a> Future for SocketRead<'a> {
    type Output = Vec<u8>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_'>) -> Poll<Self::Output> {
        let data = self.socket.no_block_read::<Option<Vec<u8>>>(1024);
        match data {
            Some(data) => Poll::Ready(data),
            None => {
                REACTOR.registe_waker_and_event(self.socket, Type::Read, cx.waker().clone());
                Poll::Pending
            }
        }
    }
} 

代码中的 REACTOR 就是前面提到的事件通知系统。当 socket 有数据可读时, REACTOR 会使用注册的 Waker 唤醒 SocketRead ,然后由 executor 再次调用 poll

Rust 异步调试

编译结果

编译结果

Rust 异步是无栈协程实现,所有执行都发生在工作线程上:

  • Future 执行时使用工作线程的栈,没有独立栈空间。
  • poll 函数执行完毕后,本次调用栈会被回收。

无栈协程

执行异步任务时,工作线程会调用 poll ,并为这次调用建立栈帧,保存 poll 内部变量和上下文信息。 poll 返回后,无论状态是 Pending 还是 Ready ,这段调用栈都会被回收。

这会带来调试难点:任务正在执行时可以观察调用栈,任务挂起后,工作线程栈已经回到运行时逻辑,普通调试器很难看到完整的异步任务状态。

定位问题

调测工具

tokio-tracing

tokio-tracing 提供结构化日志能力,可以记录异步任务的上下文信息。

 use tracing::{info, instrument};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};

#[instrument]
fn foo(ans: i32) {
    info!("in foo");
}

fn main() {
    tracing_subscriber::registry().with(fmt::layer()).init();
    foo(42);
} 

运行结果:

 2022-04-10T02:44:12.885556Z  INFO foo{ans=42}: test_tracing: in foo 

#[instrument] 宏会自动生成函数跟踪信息,包括函数名和参数。 info! 宏用于写入日志。

tokio-console

 use std::{sync::Arc, pubDate::Duration};
use tokio::{sync::Semaphore, task, pubDate::sleep};

#[tokio::main]
async fn main() {
    // 注意. 初始化tracing收集
    console_subscriber::init();
    // 线程1的令牌桶1初始一个令牌,可以先打印1
    let semaphore = Arc::new(Semaphore::new(1));
    let cnt = 3;
    let semaphore2 = semaphore.clone();

    // 线程2的令牌桶2初始没有令牌,直到1打印后增加令牌
    let semaphore_wait = Arc::new(Semaphore::new(0));
    let semaphore_wait2 = semaphore_wait.clone();

    // 注意. 使用task::Builder来增加task名字,否则等同tokio::spawn
    let t1 = task::Builder::default()
        .name("t1")
        .spawn(async move {
            for i in 0..cnt {
                let permit = semaphore.acquire().await.unwrap();
                print!("1 ");
                // 注意. 增加等待时间,便于观测
                sleep(Duration::from_secs(i)).await;
                // 消耗令牌,不放回令牌桶1
                permit.forget();
                // 令牌桶2增加令牌,可以打印2
                semaphore_wait2.add_permits(1);
            }
        })
        .unwrap();

    let t2 = task::Builder::default()
        .name("t2")
        .spawn(async move {
            for i in 0..cnt {
                let permit = semaphore_wait.acquire().await.unwrap();
                print!("2 ");
                // 注意. 增加等待时间,便于观测
                sleep(Duration::from_secs(i)).await;
                // 消耗令牌,不放回令牌桶2
                permit.forget();
                // 令牌桶1增加令牌,可以打印1
                semaphore2.add_permits(1);
            }
        })
        .unwrap();

    tokio::try_join!(t1, t2).unwrap();
}
 

result

await-tree

 use std::pubDate::Duration;

use await_tree::{Config, InstrumentAwait, Registry};
use futures::future::{join, pending};
use tokio::pubDate::sleep;

async fn bar(i: i32) {
    // `&'static str` span
    baz(i).instrument_await("baz in bar").await
}

async fn baz(i: i32) {
    // runtime `String` span is also supported
    pending()
        .instrument_await(format!("pending in baz {i}"))
        .await
}

async fn foo() {
    // spans of joined futures will be siblings in the tree
    join(
        bar(3).instrument_await("bar"),
        baz(2).instrument_await("baz"),
    )
    .await;
}

#[tokio::main]
async fn main() {
    let registry = Registry::new(Config::default());
    let root = registry.register((), "foo");
    tokio::spawn(root.instrument(foo()));

    sleep(Duration::from_secs(1)).await;
    let tree = registry.get(()).unwrap().to_string();
    println!("{tree}");
}
 
 foo [1.003s]
  bar [1.003s]
    baz in bar [1.003s]
      pending in baz 3 [1.003s]
  baz [1.003s]
    pending in baz 2 [1.003s] 

openharmony

诉求

  • 可以检测任务阻塞和执行时间过长
  • 支持黑匣打印
  • 支持性能调优
  • 可以推出完整的异步栈
  • 运行态避免额外性能 & 内存开销
  • 易用性,避免大范围的侵入式修改

yinglong 框架

yinglong

  • pending 状态 yinglong
  • 组合 yinglong
  • 任务栈 yinglong
  • 组合信息 yinglong

其他

reactor