Шпаргалка по многопоточности в языке Rust
Язык Rust предлагает множество примитивов для написания многопоточного кода. В первом приближении, они напоминают соответствующие примитивы в C++. Однако имеется ряд важных отличий. Попробуем во всем разобраться на простых примерах.
Создание и завершение потоков
Рассмотрим следующий код:
use rand::Rng;
use std::thread;
use std::time::Duration;
fn main() {
let values = [5, 10, 15];
let mut handles = vec![];
for value in values {
let handle = thread::spawn(move || {
let delay = rand::thread_rng().gen_range(0..1000);
thread::sleep(Duration::from_millis(delay));
value * 2
});
handles.push(handle);
}
for (i, handle) in handles.into_iter().enumerate() {
let result = handle.join().unwrap();
println!("Thread {}: {} * 2 = {}", i + 1, values[i], result);
}
}
Здесь мы создаем три потока, выполняющих «сложные» вычисления. Вычисления представляют собой умножение переданного числа на два, а сложность изображается через thread::sleep.
За создание потоков отвечает thread::spawn. В качестве аргумента передается лямбда-функция, которая принимает параметр value через замыкание. Работа с лямбдами и замыканиями в Rust всегда безопасна, если только не используется unsafe{}. Функция возвращает JoinHandle<T>, где T – это тип, возвращаемый лямбдой.
Чтобы получить значение, которое вернул поток, на JoinHandle<T> вызывается метод .join(). Он возвращает Result<T>. Если в потоке произошла паника (например, он пытался делить на ноль), то .join() возвращает значение Err.
Паника в Rust – это определенное поведение. Когда поток паникует, его выполнение завершается, однако это не затрагивает работу остальных потоков. То есть, паники локализованы в потоках. Паники можно игнорировать, обрабатывать или распространять далее, на усмотрение разработчика. В данном случае мы делаем .join().unwrap(), что означает распространять панику.
Завершение основного потока приводит к завершению работы программы.
Каналы
Каналы в Rust похожи на каналы в Go и очереди сообщений в Erlang. Пример использования:
use rand::Rng;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx_task, rx_task) = mpsc::channel();
let (tx_result, rx_result) = mpsc::channel();
let handle = thread::spawn(move || {
while let Ok(value) = rx_task.recv() {
let delay = rand::thread_rng().gen_range(0..1000);
thread::sleep(Duration::from_millis(delay));
let result = value * 2;
tx_result.send(result).unwrap();
}
});
let values = [5, 10, 15];
for value in values {
tx_task.send(value).unwrap();
}
drop(tx_task); // закрыли канал
for value in values {
let result = rx_result.recv().unwrap();
println!("{} * 2 = {}", value, result);
}
handle.join().unwrap();
}
Здесь за выполнение «тяжелых» вычислений отвечает один поток. Он получает задания из первого канала и отправляет результаты во второй канал. Соответственно, родительский поток шлет задания в первый канал и забирает результаты из второго.
Вызов mpsc::channel() возвращает пару из Sender<T> и Receiver<T>. Пишем в канал через Sender, читаем через Receiver. Если нужно несколько отправителей, то вызываем на Sender метод .clone(). Однако Receiver может быть только один. Чтобы несколько потоков могли читать из канала, Receiver следует обернуть в мьютекс. Мьютексы мы рассмотрим далее по тексту.
Sender и Receiver имеют методы .send() и .recv() соответственно. Оба метода возвращают Result. Sender возвращает ошибку, если канал был закрыт с другого конца, а Receiver – если в канале больше нет сообщений и все Sender'ы были закрыты. Данное поведение выгодно отличает каналы в Rust от каналов в Go, где запись в закрытый канал сразу приводит к панике.
Использованный выше mpsc::channel() создает канал неограниченного размера. Обычно его лучше ограничивать. Для этого используйте mpsc::sync_channel(capacity). Запись в переполненный канал блокирует выполнение потока, тем самым обеспечивая back pressure. Это выгодно отличает каналы в Rust от очередей сообщений в Erlang, которые принципиально неограничены.
Атомарные переменные
В стандартную библиотеку Rust входят атомарные переменные. Пример их использования:
use std::sync::atomic::{AtomicI32, Ordering};
use std::thread;
static COUNTER: AtomicI32 = AtomicI32::new(0);
fn main() {
let mut handles = vec![];
for _ in 0..3 {
let handle = thread::spawn(|| {
for _ in 0..1000 {
COUNTER.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let final_value = COUNTER.load(Ordering::SeqCst);
println!("Counter: {}", final_value);
}
Три потока инкрементируют значение атомарного счетчика 1000 раз. В итоге счетчик принимает значение 3000. Все довольно просто.
Единственный вопрос, который может возникнуть – а что такое Ordering::SeqCst? Данный параметр задает порядок доступа к памяти (memory ordering). Он определяет правила и ограничения на переупорядочивание операций с памятью. Если вы не специализируетесь на lock-free алгоритмах, то я бы рекомендовал наиболее строгий порядок, sequentially consistent. Именно он и используется в примере. Больше информации о порядке доступа к памяти можно почерпнуть, например, из главы про атомарные переменные в бесплатной онлайн-книге The Rustonomicon.
Также в стандартной библиотеке предусмотрены операции compare-and-swap. Заинтересованным читателям предлагается ознакомиться с ними самостоятельно, в качестве упражнения. А вот спинлоков не занесли. Сделано это намеренно, поскольку в 99% случаев предпочтительнее использовать мьютексы.
Мьютексы
В качестве примера использования мьютексов перепишем на них предыдущий пример:use std::sync::Mutex;
use std::thread;
static COUNTER: Mutex<i32> = Mutex::new(0);
fn main() {
let mut handles = vec![];
for _ in 0..3 {
let handle = thread::spawn(|| {
for _ in 0..1000 {
let mut num = COUNTER.lock().unwrap();
*num += 1;
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let final_value = *COUNTER.lock().unwrap();
println!("Counter: {}", final_value);
}
Метод .lock() возвращает MutexGuard<T>, обернутый в Result. Пока у потока есть MutexGuard<T>, он может работать с данными. При выходе из скоупа MutexGuard<T> освобождается, и мьютекс отпускается.
Спрашивается, а зачем нужен Result? Не может же захват лока потерпеть неудачу? Это сделано для случая, когда поток паникует во время удерживая мьютексов. В данном сценарии мьютексы отпускаются, но помечаются, как «отравленные» (poisoned). На отравленном мьютексе .lock() возвращает Err, однако мьютекс захватывается успешно. Таким образом, панику, произошедшую в другом потоке, можно обработать, проигнорировать или распространить далее. Только следует помнить, что данные под отравленным мьютексом могут быть неконсистентны.
Заметьте, что мьютексы в Rust не является реентерабельным (reentrant). Если поток, удерживающий мьютекс, попытается захватить его снова, то встанет в дэдлок. Rust защищает от многих ошибок, но не от дэдлоков. Последние возможны при использовании не только мьютексов.
Мьютексы в Rust довольно дешевы. Например, в Linux они основаны на futex (см man futex). При работе под другими ОС используются схожие механизмы.
RWLock
RWLock (read-write lock) похож на мьютекс, но различает захват на чтение и на запись. На чтение RWLock могут удерживать несколько потоков, но на запись – только один. RWLock имеет чуть большие накладные расходы, чем мьютекс. При большом количестве операций чтения обычно RWLock обеспечивает большую производительность, так как операции чтения не блокируют друг друга.
Пример использования:
use std::sync::RwLock;
use std::thread;
use std::time::Duration;
static COUNTER: RwLock<i32> = RwLock::new(0);
fn main() {
let mut handles = vec![];
for _ in 0..3 {
let handle = thread::spawn(|| {
for _ in 0..1000 {
thread::sleep(Duration::from_micros(100));
let mut num = COUNTER.write().unwrap();
*num += 1;
}
});
handles.push(handle);
}
loop {
let value = *COUNTER.read().unwrap();
println!("Counter: {}", value);
if value >= 3000 {
break;
}
thread::sleep(Duration::from_millis(10));
}
for handle in handles {
handle.join().unwrap();
}
}
В особых пояснениях данный код как будто бы не нуждается.
Заключение
За кадром остались условные переменные (condvar) и локальные хранилища потоков (thread local storage). Было решено не включать их в шпаргалку. По личному опыту, потребность в данных механихмах возникает не часто. Следует однако помнить, что Rust предлагает и их тоже.
Rust поощряет использование чистых функций и неизменяемых переменных. За счет этого код естественным образом распараллеливается. Например, если функция принимает два аргумента, то эти аргументы можно вычислить параллельно. Если код используем итераторы (map, filter, fold, zip, ...), то он естественным образом переносится на каналы. Данные идеи реализованы в крейте Rayon. Заинтересованным читателям предлагается ознакомиться с ним самостоятельно.
Полная версия исходников к данному посту доступна здесь. Как обычно, буду рад вашим вопросам и дополнениям.