Написание многопоточных приложений на C++

12 июля 2017

В более ранних постах было рассказано про многопоточность в Windows при помощи CreateThread и прочего WinAPI, а также многопоточность в Linux и других *nix системах при помощи pthreads. Если вы пишите на C++11 или более поздних версиях, то вам доступны std::thread и другие многопоточные примитивы, появившиеся в этом стандарте языка. Далее будет показано, как с ними работать. В отличие от WinAPI и pthreads, код, написанный на std::thread, является кроссплатформенным.

Примечание: Приведенный код был проверен на GCC 7.1 и Clang 4.0 под Arch Linux, GCC 5.4 и Clang 3.8 под Ubuntu 16.04 LTS, GCC 5.4 и Clang 3.8 под FreeBSD 11, а также Visual Studio Community 2017 под Windows 10. CMake до версии 3.8 не умеет говорить компилятору использовать стандарт C++17, указанный в свойствах проекта. Как установить CMake 3.8 в Ubuntu 16.04 описано здесь. Чтобы код компилировался при помощи Clang, в *nix системах должен быть установлен пакет libc++. Для Arch Linux пакет доступен на AUR. В Ubuntu есть пакет libc++-dev, но вы можете столкнуться с непофикшенным багом, из-за которого код так просто собираться не будет. Воркэраунд описан на StackOverflow. Во FreeBSD для компиляции проекта нужно установить пакет cmake-modules.

Мьютексы

Ниже приведен простейший пример использования трэдов и мьютексов:

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex mtx;
static int counter = 0;
static const int MAX_COUNTER_VAL = 100;

void thread_proc(int tnum) {
    for(;;) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            if(counter == MAX_COUNTER_VAL)
                break;
            int ctr_val = ++counter;
            std::cout << "Thread " << tnum << ": counter = " <<
                         ctr_val << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    std::vector<std::thread> threads;
    for(int i = 0; i < 10; i++) {
        std::thread thr(thread_proc, i);
        threads.emplace_back(std::move(thr));
    }

    // can't use const auto& here since .join() is not marked const
    for(auto& thr : threads) {
        thr.join();
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

Обратите внимание на оборачивание std::mutex в std::lock_guard в соответствии c идиомой RAII. Такой подход гарантирует, что мьютекс будет отпущен при выходе из скоупа в любом случае, в том числе при возникновении исключений. Для захвата сразу нескольких мьютексов с целью предотвращения дэдлоков существует класс std::scoped_lock. Однако он появился только в C++17 и потому может работать не везде. Для более ранних версий C++ есть аналогичный по функционалу шаблон std::lock, правда для корректного освобождения локов по RAII он требует написания дополнительного кода.

RWLock

Нередко возникает ситуация, в которой доступ к объекту чаще происходит на чтение, чем на запись. В этом случае вместо обычного мьютекса эффективнее использовать read-write lock, он же RWLock. RWLock может быть захвачен сразу несколькими потоками на чтение, или только одним потоком на запись. RWLock’у в C++ соответствуют классы std::shared_mutex и std::shared_timed_mutex:

#include <iostream>
#include <shared_mutex>
#include <thread>
#include <vector>

// std::shared_mutex mtx; // will not work with GCC 5.4
std::shared_timed_mutex mtx;

static int counter = 0;
static const int MAX_COUNTER_VAL = 100;

void thread_proc(int tnum) {
    for(;;) {
        {
            // see also std::shared_lock
            std::unique_lock<std::shared_timed_mutex> lock(mtx);
            if(counter == MAX_COUNTER_VAL)
                break;
            int ctr_val = ++counter;
            std::cout << "Thread " << tnum << ": counter = " <<
                         ctr_val << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    std::vector<std::thread> threads;
    for(int i = 0; i < 10; i++) {
        std::thread thr(thread_proc, i);
        threads.emplace_back(std::move(thr));
    }

    for(auto& thr : threads) {
        thr.join();
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

По аналогии с std::lock_guard для захвата RWLock’а используются классы std::unique_lock и std::shared_lock, в зависимости от того, как мы хотим захватить лок. Класс std::shared_timed_mutex появился в C++14 и работает на всех* современных платформах (не скажу за мобильные устройства, игровые консоли, и так далее). В отличие от std::shared_mutex, он имеет методы try_lock_for, try_lock_unti и другие, которые пытаются захватить мьютекс в течение заданного времени. Я сильно подозреваю, что std::shared_mutex должен быть дешевле std::shared_timed_mutex. Однако std::shared_mutex появился только в C++17, а значит поддерживается не везде. В частности, все еще широко используемый GCC 5.4 про него не знает.

Thread Local Storage

Иногда бывает нужно создать переменную, вроде глобальной, но которую видит только один поток. Другие потоки тоже видят переменную, но у них она имеет свое локальное значение. Для этого придумали Thread Local Storage, или TLS (не имеет ничего общего с Transport Layer Security!). Помимо прочего, TLS может быть использован для существенного ускорения генерации псевдослучайных чисел. Пример использования TLS на C++:

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex io_mtx;
thread_local int counter = 0;
static const int MAX_COUNTER_VAL = 10;

void thread_proc(int tnum) {
    for(;;) {
        counter++;
        if(counter == MAX_COUNTER_VAL)
            break;
        {
            std::lock_guard<std::mutex> lock(io_mtx);
            std::cout << "Thread " << tnum << ": counter = " <<
                      counter << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    std::vector<std::thread> threads;
    for(int i = 0; i < 10; i++) {
        std::thread thr(thread_proc, i);
        threads.emplace_back(std::move(thr));
    }

    for(auto& thr : threads) {
        thr.join();
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

Мьютекс здесь используется исключительно для синхронизации вывода в консоль. Для доступа к thread_local переменным никакая синхронизация не требуется.

Атомарные переменные

Атомарные переменные часто используются для выполнения простых операций без использования мьютексов. Например, вам нужно инкрементировать счетчик из нескольких потоков. Вместо того, чтобы оборачивать int в std::mutex, эффективнее воспользоваться std::atomic_int. Также C++ предлагает типы std::atomic_char, std::atomic_bool и многие другие. Еще на атомарных переменных реализуют lock-free алгоритмы и структуры данных. Стоит отметить, что они весьма сложны в разработке и отладке, и не на всех системах работают быстрее аналогичных алгоритмов и структур данных с локами.

Пример кода:

#include <atomic>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

static std::atomic_int atomic_counter(0);
static const int MAX_COUNTER_VAL = 100;

std::mutex io_mtx;

void thread_proc(int tnum) {
    for(;;) {
        {
            int ctr_val = ++atomic_counter;
            if(ctr_val >= MAX_COUNTER_VAL)
                break;

            {
                std::lock_guard<std::mutex> lock(io_mtx);
                std::cout << "Thread " << tnum << ": counter = " <<
                             ctr_val << std::endl;
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

int main() {
    std::vector<std::thread> threads;

    int nthreads = std::thread::hardware_concurrency();
    if(nthreads == 0) nthreads = 2;

    for(int i = 0; i < nthreads; i++) {
        std::thread thr(thread_proc, i);
        threads.emplace_back(std::move(thr));
    }

    for(auto& thr : threads) {
        thr.join();
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

Обратите внимание на использование процедуры hardware_concurrency. Она возвращает оценку количества трэдов, которое в текущей системе может выполняться параллельно. Например, на машине с четырехядерным процессором, поддерживающим hyper threading, процедура возвращает число 8. Также процедура может возвращать ноль, если сделать оценку не удалось или процедура попросту не реализована.

Кое-какую информацию о работе атомарных переменных на уровне ассемблера можно найти в заметке Шпаргалка по основным инструкциям ассемблера x86/x64.

Заключение

Насколько я вижу, все это действительно неплохо работает. То есть, при написании кроссплатформенных приложений на C++ про WinAPI и pthreads можно благополучно забыть. В чистом C начиная с C11 также существуют кроссплатформенные трэды. Но они все еще не поддерживаются Visual Studio (я проверил), и вряд ли когда-либо будут поддерживаться. Не секрет, что Microsoft не видит интереса в развитии поддержки языка C в своем компиляторе, предпочитая концентрироваться на C++.

За кадром осталось еще немало примитивов: std::condition_variable(_any), std::(shared_)future, std::promise, std::sync и другие. Для ознакомления с ними я рекомендую сайт cppreference.com.

Полная версия исходников к этой заметке, как обычно, лежит на GitHub. А как вы сейчас пишите многопоточные приложения на C++?

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.