Знакомство с футурами в Rust и рантаймом Tokio
Недавно мы познакомились с многопоточностью в Rust. Увы, обыкновенные потоки операционной системы подходят не для всех задач. В частности, обслуживать с их помощью 10 000+ TCP-соединений будет не лучшей затеей. Причина заключается в высоких накладных расходах как на сами потоки, так и на переключение контекста между ними. Поэтому в Go и Erlang реализованы горутины и легковесные процессы соответственно.
Немного теории
Горутин в Rust не предусмотрено. Вместо них предлагается использовать футуры (futures). В первом приближении это похоже на то, как сделано в Scala. Легковесные потоки не встраиваются в язык, а реализуются в крейтах. Данный подход имеет свои преимущества и недостатки. С одной стороны, код получается чуть более многословным. С другой, существует больше одного способа реализовать легковесные потоки. Разработчик волен выбирать тот, что лучше подходит для задачи.
Футуры являются частью языка. Представляют собой обещания выполнить какую-то работу – вычисление, ввод-вывод и т.д. В Rust Future является трейтом / чертой, а не конкретным типом, как в Scala. Типы, реализующие Future, создаются на этапе компиляции. Они анонимны и скрыты от пользователя. Здесь все аналогично лямбдам и замыканиям. Конкретный тип лямбды мы тоже не знаем, но знаем, что он реализует Fn / FnMut / FnOnce. Также для работы с футурами язык предлагает кое-какой синтаксический сахар. Его мы рассмотрим далее.
Код, отвечающий за исполнение футур, находится в крейтах. Говорят, что крейт реализует среду выполнения для асинхронного кода, или асинхронный рантайм / asynchronous runtime. Непосвященный читатель может поинтересоваться, а причем тут асинхронность?
Любые легковесные потоки представляют собой виртуальные потоки, реализованные поверх тежеловесных потоков операционной системы. Легковесных потоков может быть несколько тысяч. Количество тяжеловесных обычно равно количеству ядер CPU, так как он физически не способен выполнять большее количество потоков одновременно. При использовании блокирующего API ввода-вывода будут блокироваться потоки операционной системы. Если одновременно выполнить столько операций ввода-вывода, сколько ядер у CPU, то легковесные потоки встанут. Поэтому в Go, Erlang и в любой другой реализации используется асинхронный, неблокирующий ввод-вывод.
Какие же рантаймы нам доступны? В свое время был довольно популярен async_std. Сейчас он задепрекейчен в пользу smol. Крейт Tokio в наши дни используется чаще всего. Это более тяжелый крейт, нежели smol, зато он умеет вообще все на свете. Как smol, так и Tokio работают на основе mio. Данный крейт представляет собой кроссплатформенную обертку к epoll, kqueue и IOCP. По сути, smol и Tokio являются аналогами libevent для Rust.
Также есть glommio, monoio и compio. Данные крейты ориентированны на Linux и интерфейс io_uring. Последний выгодно отличается от epoll, например, тем фактом, что позволяет работать асинхронно не только с сокетами, но и с файлами. Отличаются названные крейты сделанными в них компромиссами. В частности, на предмет поддержки или не поддержки операционных систем, отличных от Linux.
Особняком стоит Embassy. Это реализация кооперативной многозадачности для микроконтроллеров. Фактически, здесь асинхронный рантайм конкурирует с операционными системами для МК, такими как FreeRTOS. В настоящее время поддерживаются микроконтроллеры STM32, nRF, ESP32 и RP2040.
В рамках данного поста я буду использовать Tokio, как наиболее популярный рантайм. Однако забывать о существовании альтернативных рантаймов не стоит. В плане использования они не так уж сильно отличаются от Tokio.
Базовые примеры
Рассмотрим простейший пример использования футур и Tokio:
use tokio::time::{self,Duration};
use tokio::task;
async fn background_task(value: u32) -> u32 {
time::sleep(Duration::from_millis(500)).await;
task::yield_now().await;
value * 2
}
#[tokio::main]
async fn main() {
let handle1 = task::spawn(background_task(5));
let handle2 = task::spawn(background_task(10));
let res1 = handle1.await.unwrap();
println!("res1 = {}", res1);
let res2 = handle2.await.unwrap();
println!("res2 = {}", res2);
}
Данный код создает два легковесных потока, которые в Rust принято называть асинхронными задачами. Данные задачи выполняют «тяжелые» вычисления. Код дожидается их завершения, после чего выводит результат. Попробуем разобраться, как это работает.
На #[tokio::main] внимания не обращаем. Это всего лишь макрос, генерирующий шаблонный код с инициализацией рантайма. Данный код является деталью реализации Tokio. Нам его знать не обязательно. Осталось понять, что еще за async fn и .await такие. Все остальное вроде бы выглядит знакомо.
Начнем с async fn. Это пример того самого синтаксического сахара. Реализация background_task() без async fn выглядела бы как-то так:
// пока что упрощенная версия без sleep() и yield_now()
fn background_task(value: u32) -> impl Future<Output = u32> {
async move {
value * 2
}
}
Ключевое слово async оборачивает блок кода во Future. Имея Future, можно вызвать на нем .await. Хоть .await и похож на вызов метода, на самом деле это часть синтаксиса Rust. Выполнение кода приостанавливается на .await до тех пор, пока соответствующая футура не будет вычислена. Использовать .await можно только из асинхронного кода.
Полная версия background_task() использует несколько .await. Если попытаться переписать функцию без них, то получится что-то в таком духе:
fn background_task(value: u32) -> impl Future<Output = u32> {
time::sleep(Duration::from_millis(500))
.then(move |()| task::yield_now())
.map(move |()| value * 2)
}
Метод .then() объединяет футуры в цепочку последовательно выполняемых действий. Метод .map() делает то же самое, только его аргумент – это обычная лямбда, а не лямбда, возвращающая футуру. Эти и другие методы реализованы в крейте futures. В стандартной библиотеке они не нужны, потому что .await и удобнее, и реализован эффективнее.
Стоит сказать пару слов о функциях time::sleep() и task::yield_now(). Первая создает футуру, которая завершается через заданный интервал времени. Примечательна тем, что не занимает рантайм холостыми циклами. Вторая уступает рантайм другим футурам. Следует использовать ее в коде, который выполняется долго. Без yield_now() несколько тяжелых запросов к серверу (с сотней мегабайт данных для обработки на CPU) могут заблокировать остальные запросы. В приведенном коде обе функции использованы чисто для демонстрации.
Чтобы Future начала выполняться, следует передать ее функции task::spawn(), что мы и делаем в main(). Так происходит создание асинхронной задачи. Из task::spawn() возвращается JoinHandle<T>. Данный тип реализует трейт Future, за счет чего с ним работает .await. На этот раз результатом выполнения Future является Result. Поэтому в коде мы видим unwrap(). Когда задача паникует, то из JoinHandle мы получаем Err.
Другая важная функция – это task::spawn_blocking():
use tokio::task;
use std::time::Duration;
fn heavy_computation(value: u64) -> u64 {
std::thread::sleep(Duration::from_millis(500));
value * 2
}
#[tokio::main]
async fn main() {
let handle1 = task::spawn_blocking(|| heavy_computation(10));
let res1 = handle1.await.unwrap();
println!("res1 = {}", res1);
}
С ее помощью можно запускать обычный блокирующий код из асинхонного кода. Может пригодиться, например, если в проекте используются библиотеки на языке Си. Блокирующий код запускается на отдельном пуле потоков, и потому не затрагивает работу асинхронного кода. Tokio использует этот же пул для работы с файлами, консолью (stdin, stdout, stderr), а также, несколько контринтуитивно, для резолвинга доменных имен.
Количество блокируемых потоков в пуле меняется динамически. Имеется ограничение на максимальный размер пула. По умолчанию оно составляет 512 потоков.
Примитивы синхронизации
Футуры могут работать с атомарными переменными, но не с другими примитивами синхронизации из стандартной библиотеки. Ведь последние блокируют потоки операционной системы. По этой причине Tokio имеет свои реализации мьютексов, RWLock'ов и каналов.
Пример использования мьютексов:
use tokio::sync::Mutex;
use tokio::task;
static COUNTER: Mutex<i32> = Mutex::const_new(0);
#[tokio::main]
async fn main() {
let mut handles = vec![];
for _ in 0..3 {
let handle = task::spawn(async {
for _ in 0..1000 {
let mut num = COUNTER.lock().await;
*num += 1;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let final_value = *COUNTER.lock().await;
println!("Counter: {}", final_value);
}
На что обратить внимание:
- В отличие от мьютексов стандартной библиотеки, мьютексы в Tokio не бывают отравленными. В случае паники мьютексы, захваченные задачей, отпускаются. Однако
.lock().awaitне информирует другие задачи о панике; - Мьютексы в Tokio справедливы. Задачи захватывают мьютексы в том порядке, в каком они вызывали
.lock().await. То есть, по принципу FIFO; - Выполнение задачи может прерваться на
.await, и возобновиться на другом потоке операционной системы. Тем не менее, удерживатьMutexGuardв точках приостановки совершенно безопасно; - Для взаимодействия с блокирующим кодом предусмотрен
.blocking_lock();
Пример с RWLock здесь не привожу, так как он не намного интереснее. Заинтересованные читатели найдут его в полной версии исходников к посту.
А вот на каналах остановимся поподробнее:
use rand::Rng;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{self, Duration};
#[tokio::main]
async fn main() {
let (tx_task, mut rx_task) = mpsc::channel(32);
let (tx_result, mut rx_result) = mpsc::channel(32);
let handle = task::spawn(async move {
while let Some(value) = rx_task.recv().await {
let delay = rand::rng().random_range(0..1000);
time::sleep(Duration::from_millis(delay)).await;
let result = value * 2;
tx_result.send(result).await.unwrap();
}
});
let values = [5, 10, 15];
for value in values {
tx_task.send(value).await.unwrap();
}
drop(tx_task);
for value in values {
let result = rx_result.recv().await.unwrap();
println!("{} * 2 = {}", value, result);
}
handle.await.unwrap();
}
На что обратить внимание:
- Каналы по умолчанию ограниченные. Есть
mpsc::unbounded_channel(), однако он может съесть всю оперативную память. Поэтому им лучше не пользоваться; - В отличие от стандартной библиотеки, метод
.recv()требует эксклюзивного заимстованияReceiver. Поэтому пишемmut rx_taskиmut rx_result; - Также метод
.recv()возвращаетOptionвместоResult; - Для взаимодействия с блокирующим кодом есть
.blocking_send()и.blocking_recv();
Помимо названных примитивов синхронизации Tokio предлагает и другие, в частности watch, oneshot и broadcast. В сущности, это специализированные версии каналов. Заинтересованным читателям предлагается ознакомиться с ними самостоятельно.
Стримы
Мой рассказ о футурах был бы неполным без упоминания стримов. Говоря простыми словами, Stream – это асинхронный аналог итератора.
Например, Stream может быть создан из канала:
use rand::Rng;
use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{self, Duration};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
async fn process_value(value: i32) -> i32 {
let delay = rand::rng().random_range(0..1000);
time::sleep(Duration::from_millis(delay)).await;
value * 2
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);
task::spawn(async move {
for value in [5, 10, 15, 20, 25] {
tx.send(value).await.unwrap();
}
});
let results: Vec<_> = ReceiverStream::new(rx)
.filter(|&x| x > 5)
.then(|x| process_value(x))
.collect()
.await;
for result in results {
println!("result = {}", result);
}
}
Стримы удобны при работе с файлами, приходящим по TCP потоком сообщений, и т.д. Больше информации вы найдете в документации на крейт tokio_stream. Крейт futures также предлагает реализацию стримов. Последняя не привязана к конкретному рантайму.
Заключение
Увы, рассмотреть все возможности Tokio в рамках одной статьи не представляется возможным. Как минимум, за кадром остались модули tokio::fs и tokio::net для работы с файлами и сетью соответственно. А ведь помимо самого Tokio есть еще и основанные на нем крейты – HTTP-клиент reqwest, клиент к PostgreSQL tokio-postgres, модульный веб-фреймворк axum, и многие другие.
Полную версию исходников к посту вы найдете в этом архиве. Как обычно, буду рад вашим вопросам и дополнениям.