Работа с нитями/потоками в Haskell

24 июня 2013

Как вы уже могли догадаться, я снова взялся за изучение Haskell. Очевидно, этот язык нельзя учить наскоком. Я решил запастись терпением и вникать во все медленно, но верно. Например, недавно я разбирался с многопоточностью.

Оказалось, что в Haskell используются легковесные потоки, так же как в Erlang, Go или OCaml‘овском Lwt. Взаимодействие между потоками происходит при помощи захватываемых изменяемых переменных (locking mutable variables), или MVars. В отличие, скажем, от Erlang, программа на Haskell завершается, когда свою работу завершает основной поток. Готовых средств, позволяющих узнать состояние других потоков, скажем, выполняется ли заданный поток или уже завершился, не предусмотрено. Это сделано с целью минимизации накладных расходов. При необходимости поверх «голых» потоков можно реализовать что угодно. Как и в Erlang, весь ввод и вывод в Haskell происходит через epoll/select. Благодаря этому, уперевшись в I/O, поток не блокирует остальные потоки, работающие на той же нитке операционной системы, как это происходит, скажем, в Akka.

Дополнение: На самом деле, «из коробки» есть как минимум функция forkFinally, позволяющая выполнить некие действия в случае завершения потока, в том числе как-то уведомить об этом другие потоки.

MVar можно представить, как разделяемую несколькими потоками переменную, доступ к которой ограничивается мьютексом. В отличие от аналогичной конструкции в C++ MVar также может быть пустым. В этом смысле MVar напоминает тип Maybe или очередь из одного элемента. Написанное выше становится более понятным при взгляде на типы основных функций для работы со всем этим хозяйством.

Создание нового потока производится с помощью функции forkIO:

forkIO :: IO () -> IO ThreadId

Новый MVar можно создать одним из следующих способов:

newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)

Узнать, является ли MVar пустым, можно при помощи функции isEmptyMVar:

isEmptyMVar :: MVar a -> IO Bool

Для записи в MVar предназначена функция putMVar:

putMVar :: MVar a -> a -> IO ()

Если MVar уже заполнен, putMVar заблокирует текущий поток до тех пор, пока MVar не освободится. У функции putMVar есть еще одно важное свойство. Если несколько потоков будут заблокированы этой функцией, при очистке MVar будет разблокирован только один поток. Разблокирование потоков производится в порядке FIFO.

Функция для чтения из MVar:

takeMVar :: MVar a -> IO a

Аналогично putMVar, функция takeMVar блокирует текущий поток, если MVar пуст. Прочие свойства такие же.

Еще пара полезных функций с очевидной семантикой:

modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()

Функция modifyMVar реализована, как обертка над вызовами takeMVar и putMVar. Поэтому она не является атомарной в случае, если в MVar пишут несколько потоков. Однако если все потоки работают с переменной через modifyMVar, тогда все ОК.

Среди других полезных функций следует отметить readMVar, swapMVar, tryTakeMVar, tryPutMVar, yield и threadDelay (последняя есть только в GHC). За подробностями обращайтесь к документации.

На первый взгляд может показаться, что этот API несколько убог. В Erlang, например, для взаимодействия между процессами используются очереди, в которые можно класть куда более одного элемента. К тому же, эти элементы могут быть любого типа. Но если задуматься, MVar даже круче. Во-первых, он словно горшочек из-под меда. В нем можно держать все что угодно. Это может быть и очередь, и очередь с приоритетами, и очередь ограниченного размера и двунаправленный канал. Во-вторых, каждый поток может иметь сколько угодно таких очередей и каналов. Или наоборот, из одной очереди может читать множество потоков. Наконец, все это — со статической проверкой типов.

Довольно теории, давайте уже напишем какую-нибудь программу. Помнится, Erlang неплохо распараллеливал по всем ядрам процессора задачу взлома хэшей методом грубой силы. Посмотрим, как с этой же задачей справится Haskell.

Начнем с однопоточной версии программы, а затем распараллелим ее. Общий функционал для каждой из версий программы вынесем в отдельный модуль:

module Bruteforce.Common where

import Data.Word
import Text.Printf
import Data.Digest.SHA2

passwordList :: String -> Int -> [String]
passwordList charList len =
    stream beginState
  where
    beginState = replicate len charList
    endState = replicate len [ last charList ]
    nextState ((_:[]):xs) = charList : nextState xs
    nextState ((_:ys):xs) = ys : xs
    nextState x = error $ "nextState " ++ show x
    stream st =
      let pw = map head st in
      if st == endState then [ pw ]
                        else pw : stream (nextState st)

hash :: String -> String
hash =
  concatMap (printf "%02x" :: Word8 -> String) .
    toOctets . sha256Ascii

Очевидно, тут мы не гонимся за эффективностью. Куда важнее сейчас распараллеливание или его отсутствие. Обратите особое внимание на функцию passwordList. Она генерирует список паролей заданной длины, состоящих из указанных букв. В функции не используется хвостовая рекурсия, но благодаря ленивым вычислениям, память, тем не менее, расходуется эффективно.

Программа, работающая в один поток:

import Bruteforce.Common
 
main :: IO ()
main = do
  let hashList = [
        -- 1234
        "03ac674216f3e15c761ee1a5e255f067" ++
        "953623c8b388b4459e13f978d7c846f4",
        -- r2d2
        "8adce0a3431e8b11ef69e7f7765021d3" ++
        "ee0b70fff58e0480cadb4c468d78105f"
        ]
      pwLen = 4
      charList = ['0'..'9'] ++ ['a'..'z']
      pwHashList = [(pw,hash pw) | pw <- passwordList charList pwLen]
      rslt = [pw ++ ":" ++ h | (pw,h) <- pwHashList, h `elem` hashList]
  mapM_ putStrLn {- $ take (length hashList) -} rslt

На машине с четырехядерным процессором Intel Core i7-3770K частотой 3.5 GHz программа справляется с задачей за 12.7 секунд. Можно существенно ускорить ее, не производя перебор после того, как мы сломали все имеющиеся хэши (см закомментированную часть кода). Но в этом случае скорость программы будет зависит от порядка, в котором перебираются пароли, что нежелательно.

Программа, решающая ту же задачу в несколько потоков:

import Bruteforce.Common
import Control.Concurrent
import Control.Monad
import Control.DeepSeq

workerLoop :: MVar [String] -> MVar [ [String] ] -> String -> Int ->
                [String] -> IO ()
workerLoop taskQueue resultQueue charList pwLen hashList = do
  maybeTask <- modifyMVar taskQueue
                 (\q -> return $ case q of
                                   [] -> ([], Nothing)
                                   (x:xs) -> (xs, Just x))
  case maybeTask of
    Nothing -> return ()
    Just task -> do
      let postfixList = passwordList charList $ pwLen - length task
          pwList = map (task ++) postfixList
          pwHashList = [(pw, hash pw) | pw <- pwList]
          rslt = [pw ++ ":" ++ h | (pw,h) <- pwHashList,
                                    h `elem` hashList]
      rslt `deepseq` modifyMVar_ resultQueue (\q -> return $ rslt:q)
      workerLoop taskQueue resultQueue charList pwLen hashList

mainLoop :: MVar [ [String] ] -> Int -> IO ()
mainLoop _ 0 = return ()
mainLoop resultQueue taskNumber = do
  results <- modifyMVar resultQueue (\q -> return ([], q))
  case results of
    [] -> do
      threadDelay 100000 -- 100 ms
      mainLoop resultQueue taskNumber
    _ -> do
      mapM_ (mapM_ putStrLn) results
      mainLoop resultQueue (taskNumber - length results)

main :: IO ()
main = do
  let hashList = [
        -- 1234
        "03ac674216f3e15c761ee1a5e255f067" ++
        "953623c8b388b4459e13f978d7c846f4",
        -- r2d2
        "8adce0a3431e8b11ef69e7f7765021d3" ++
        "ee0b70fff58e0480cadb4c468d78105f"
        ]
      pwLen = 4
      chunkLen = 2
      charList = ['0'..'9'] ++ ['a'..'z']
      taskList = passwordList charList chunkLen
      taskNumber = length taskList
  workerNumber <- getNumCapabilities
  taskQueue <- newMVar taskList
  resultQueue <- newMVar []
  workerNumber `replicateM_` forkIO (workerLoop taskQueue resultQueue
                                             charList pwLen hashList)
  mainLoop resultQueue taskNumber

Функция getNumCapabilities возвращает количество потоков операционной системы, которое используется runtime system. Как будет показано ниже, это количество можно задавать при запуске программы. В данном случае логично использовать столько же легковесных потоков, сколько используется реальных.

Программа использует две очереди. В действительности это обычные списки, а никакие не очереди, но мне кажется, будет понятнее, если думать о них, как об очередях. Очередь taskQueue имеет тип MVar [String]. В ней содержатся задачи, представляющие собой префиксы паролей для перебора. Поток берет префикс из очереди (головы списка) и перебирает все пароли с этим префиксом. Очередь resultQueue имеет тип MVar [ [String] ]. Когда поток завершает работу над задачей, он помещает в эту очередь (также голову списка) список строк, которые он хотел бы вывести на экран.

Завершение главного потока приводит к завершению работы всей программы. Поэтому он должен завершится только после завершения дочерних потоков. Чтобы главный поток не висел без дела, дадим ему кое-какую работу. Работа эта заключается в том, чтобы разгребать resultQueue и выводить результат работы других потоков на экран. Строго говоря, мы вынуждены так делать, потому что ввод/вывод в Haskell не является thread safe. Заодно главный поток подсчитывает, сколько задач на данный момент осталось выполнить. Когда счетчик задач обнуляется, главный поток завершается.

Обратите внимание на то, как дочерние потоки пишут в resultQueue:

rslt `deepseq` modifyMVar_ resultQueue (\q -> return $ rslt:q)

Мы помним, что данные в Haskell по умолчанию являются ленивыми, и MVar — не исключение. Если мы просто запишем в resultQueue некоторый результат, его реальное вычисление произойдет в главном потоке. От параллелизма не будет никакого выигрыша! Поэтому мы принудительно вычисляем rslt с помощью deepseq и только после этого изменяем resultQueue. Тут нельзя не отметить существование пакета strict-concurrency, который предлагает те же MVars (а также каналы, речь о которых пойдет ниже), только со строгой семантикой.

Интересно, что в данном конкретном случае вместо deepseq можно использовать seq без заметных потерь в плане скорости. Это происходит благодаря условию h `elem` hashList в генераторе списка. Чтобы определить, какой элемент является головой списка, если эта голова вообще есть, нужно вычислить много хэш-функций.

Скомпилировав программу с ключом -threaded и запустив с параметрами +RTS -N8 мы обнаружим, что та же самая задача стала решаться за 3.4 секунды. Не ровно в четыре раза быстрее, как ожидалось, но близко. Это объясняется накладными расходами на поддержку самой многопоточности, которая была добавлена в программу благодаря компиляции с ключом -threaded. Убедиться в наличии таких накладных расходов можно, запустив многопоточную версию программы без параметров. В этом случае она выполнится за 14.3 секунды. Если в той же программе использовать только четыре потока, а не восемь, как было сделано ранее, она отработает за 3.8 секунды. Это объясняется тем, что используемый нами процессор поддерживает Hyper-Threading и количество логических ядер на нем как раз равно восьми.

Взаимодействие между потоками в Haskell также может осуществляться при помощи каналов. В первом приближении можно думать о каналах, как об очередях сообщений произвольной длины. Создание нового канала производится при помощи функции newChan:

newChan :: IO (Chan a)

Запись и чтение производятся с помощью функций writeChan и readChan соответственно. Вызов writeChan никогда не блокируется. Вызов readChan блокируется только в том случае, если канал пуст:

writeChan :: Chan a -> a -> IO ()
readChan :: Chan a -> IO a

Важную роль играет функция dupChan:

dupChan :: Chan a -> IO (Chan a)

Она создает дубликат канала. Пусть ch — это канал, а ch2 и ch3 — его дубликаты. Сразу после создания дубликат пуст, даже если в исходном канале были данные. При записи сообщения в ch, оно также появится в ch2 и ch3. С тем же успехом можно писать в ch2 или ch3, сообщение придет в каждый из трех каналов. Как видите, при помощи каналов очень удобно рассылать различного рода уведомления.

Важно понимать, что каналы и MVars не спасают нас от традиционных бед многопоточного программирования, например, тех же дэдлоков и состояния гонки. Для написания многопоточных приложений в Haskell имеется множество других средств. За кадром остались функции par и pseq, программная транзакционная память (software transactional memory, STM), Data Parallel Haskell, а также Cloud Haskell. Все эти вопросы заслуживают отдельных заметок.

Ссылки по теме:

Исходники к заметке вы найдете в этом архиве. Код собирается при помощи cabal.

Дополнение: Интересное отличие нитей в Haskell от нитей в других языках состоит в том, что в некоторых случаях Haskell умеет сам находить дэдлоки и посылать заблокированным нитям исключение BlockedIndefinitelyOnMVar. Например, следующая программа:

import Control.Concurrent

main = do
  m <- newEmptyMVar
  takeMVar m

… завершается с ошибкой «thread blocked indefinitely in an MVar operation». Впрочем, это не более, чем удобное отладочное средство. Полагаться на этот механизм в боевом окружении не следует.

Дополнение: Программная транзакционная память в Haskell

Метки: , , .


Вы можете прислать свой комментарий мне на почту, или воспользоваться комментариями в Telegram-группе.